You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/05/01 21:59:03 UTC
svn commit: r1332858 - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/mapreduce/
src/test/org/apache/hcatalog/mapreduce/
Author: toffer
Date: Tue May 1 21:59:03 2012
New Revision: 1332858
URL: http://svn.apache.org/viewvc?rev=1332858&view=rev
Log:
HCAT-36 Support Writing Out to Multiple Tables in HCatOutputFormat (rohini via toffer)
Added:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1332858&r1=1332857&r2=1332858&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue May 1 21:59:03 2012
@@ -72,6 +72,8 @@ Release 0.4.0 - Unreleased
HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)
NEW FEATURES
+ HCAT-36 Support Writing Out to Multiple Tables in HCatOutputFormat (rohini via toffer)
+
HCAT-342 enable hcat to work with hive bigtop rpm (thejas via gates)
HCAT-287 Add data api to HCatalog (hashutosh)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1332858&r1=1332857&r2=1332858&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Tue May 1 21:59:03 2012
@@ -48,6 +48,8 @@ import org.apache.hcatalog.data.schema.H
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
@@ -64,6 +66,7 @@ import java.util.Map.Entry;
*/
class FileOutputCommitterContainer extends OutputCommitterContainer {
+ private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class);
private final boolean dynamicPartitioningUsed;
private boolean partitionsDiscovered;
@@ -436,20 +439,18 @@ class FileOutputCommitterContainer exten
partition.setParameters(params);
- // Sets permissions and group name on partition dirs.
+ // Sets permissions and group name on partition dirs and files.
Path partPath = new Path(partLocnRoot);
- for(FieldSchema partKey : table.getPartitionKeys()){
- partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
-// LOG.info("Setting perms for "+partPath.toString());
- fs.setPermission(partPath, perms);
- try{
- fs.setOwner(partPath, null, grpName);
- } catch(AccessControlException ace){
- // log the messages before ignoring. Currently, logging is not built in Hcatalog.
-// LOG.warn(ace);
+ int i = 0;
+ for (FieldSchema partKey : table.getPartitionKeys()) {
+ if (i++ != 0) {
+ applyGroupAndPerms(fs, partPath, perms, grpName, false);
}
+ partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
}
+ // Apply the group and permissions to the leaf partition and files.
+ applyGroupAndPerms(fs, partPath, perms, grpName, true);
if (dynamicPartitioningUsed){
String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
if (harProcessor.isEnabled()){
@@ -466,7 +467,30 @@ class FileOutputCommitterContainer exten
return partition;
}
-
+ private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission,
+ String group, boolean recursive)
+ throws IOException {
+ fs.setPermission(dir, permission);
+ try {
+ fs.setOwner(dir, null, group);
+ } catch (AccessControlException ace) {
+ LOG.warn("Error changing group of " + dir, ace);
+ }
+ if (recursive) {
+ for (FileStatus fileStatus : fs.listStatus(dir)) {
+ if (fileStatus.isDir()) {
+ applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+ } else {
+ fs.setPermission(fileStatus.getPath(), permission);
+ try {
+ fs.setOwner(dir, null, group);
+ } catch (AccessControlException ace) {
+ LOG.warn("Error changing group of " + dir, ace);
+ }
+ }
+ }
+ }
+ }
private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
// file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA ->
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Tue May 1 21:59:03 2012
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The MultiOutputFormat class simplifies writing output data to multiple
+ * outputs.
+ * <p>
+ * Multiple output formats can be defined each with its own
+ * <code>OutputFormat</code> class, own key class and own value class. Any
+ * configuration on these output format classes can be done without interfering
+ * with other output format's configuration.
+ * <p>
+ * Usage pattern for job submission:
+ *
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ *
+ * job.setMapperClass(WordCountMap.class);
+ * job.setReducerClass(WordCountReduce.class);
+ * job.setInputFormatClass(TextInputFormat.class);
+ * job.setOutputFormatClass(MultiOutputFormat.class);
+ * // Need not define OutputKeyClass and OutputValueClass. They default to
+ * // Writable.class
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(IntWritable.class);
+ *
+ *
+ * // Create a JobConfigurer that will configure the job with the multiple
+ * // output format information.
+ * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ *
+ * // Defines additional single text based output 'text' for the job.
+ * // Any configuration for the defined OutputFormat should be done with
+ * // the Job obtained with configurer.getJob() method.
+ * configurer.addOutputFormat("text", TextOutputFormat.class,
+ * IntWritable.class, Text.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
+ * Text.class, IntWritable.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
+ * ...
+ * // configure method to be called on the JobConfigurer once all the
+ * // output formats have been defined and configured.
+ * configurer.configure();
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ *
+ * <pre>
+ * public class WordCountReduce extends
+ * Reducer<Text, IntWritable, Writable, Writable> {
+ *
+ * private IntWritable count = new IntWritable();
+ *
+ * public void reduce(Text word, Iterator<IntWritable> values,
+ * Context context)
+ * throws IOException {
+ * int sum = 0;
+ * for (IntWritable val : values) {
+ * sum += val.get();
+ * }
+ * count.set(sum);
+ * MultiOutputFormat.write("text", count, word, context);
+ * MultiOutputFormat.write("sequence", word, count, context);
+ * }
+ *
+ * }
+ *
+ * </pre>
+ *
+ * Map only jobs:
+ * <p>
+ * MultiOutputFormat.write("output", key, value, context); can be called similar
+ * to a reducer in map only jobs.
+ *
+ */
+public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
+
+ private static final String MO_ALIASES = "mapreduce.multiout.aliases";
+ private static final String MO_ALIAS = "mapreduce.multiout.alias";
+ private static final String CONF_KEY_DELIM = "%%";
+ private static final String CONF_VALUE_DELIM = ";;";
+ private static final String COMMA_DELIM = ",";
+ private static final List<String> configsToOverride = new ArrayList<String>();
+ private static final List<String> configsToMerge = new ArrayList<String>();
+
+ static {
+ configsToOverride.add("mapred.output.dir");
+ configsToMerge.add(JobContext.JOB_NAMENODES);
+ configsToMerge.add("tmpfiles");
+ configsToMerge.add("tmpjars");
+ configsToMerge.add("tmparchives");
+ }
+
+ /**
+ * Get a JobConfigurer instance that will support configuration of the job
+ * for multiple output formats.
+ *
+ * @param job the mapreduce job to be submitted
+ * @return
+ */
+ public static JobConfigurer createConfigurer(Job job) {
+ return JobConfigurer.create(job);
+ }
+
+ /**
+ * Write the output key and value using the OutputFormat defined by the
+ * alias.
+ *
+ * @param alias the name given to the OutputFormat configuration
+ * @param key the output key to be written
+ * @param value the output value to be written
+ * @param context the Mapper or Reducer Context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
+ throws IOException, InterruptedException {
+ KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
+ context.write(new Text(alias), keyval);
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+ for (String alias : getOutputFormatAliases(context)) {
+ JobContext aliasContext = getJobContext(alias, context);
+ OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+ outputFormat.checkOutputSpecs(aliasContext);
+ // Copy credentials and any new config added back to JobContext
+ context.getCredentials().addAll(aliasContext.getCredentials());
+ setAliasConf(alias, context, aliasContext);
+ }
+ }
+
+ @Override
+ public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
+ throws IOException,
+ InterruptedException {
+ return new MultiRecordWriter(context);
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new MultiOutputCommitter(context);
+ }
+
+ private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) {
+ OutputFormat<?, ?> outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(),
+ context.getConfiguration());
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ return outputFormat;
+ }
+
+ private static String[] getOutputFormatAliases(JobContext context) {
+ return context.getConfiguration().getStrings(MO_ALIASES);
+ }
+
+ /**
+ * Compare the aliasContext with userJob and add the differing configuration
+ * as mapreduce.multiout.alias.<aliasname>.conf to the userJob.
+ * <p>
+ * Merge config like tmpjars, tmpfile, tmparchives,
+ * mapreduce.job.hdfs-servers that are directly handled by JobClient and add
+ * them to userJob.
+ * <p>
+ * Add mapred.output.dir config to userJob.
+ *
+ * @param alias alias name associated with a OutputFormat
+ * @param userJob reference to Job that the user is going to submit
+ * @param aliasContext JobContext populated with OutputFormat related
+ * configuration.
+ */
+ private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) {
+ Configuration userConf = userJob.getConfiguration();
+ StringBuilder builder = new StringBuilder();
+ for (Entry<String, String> conf : aliasContext.getConfiguration()) {
+ String key = conf.getKey();
+ String value = conf.getValue();
+ String jobValue = userConf.getRaw(key);
+ if (jobValue == null || !jobValue.equals(value)) {
+ if (configsToMerge.contains(key)) {
+ String mergedValue = getMergedConfValue(jobValue, value);
+ userConf.set(key, mergedValue);
+ } else {
+ if (configsToOverride.contains(key)) {
+ userConf.set(key, value);
+ }
+ builder.append(key).append(CONF_KEY_DELIM).append(value)
+ .append(CONF_VALUE_DELIM);
+ }
+ }
+ }
+ builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length());
+ userConf.set(getAliasConfName(alias), builder.toString());
+ }
+
+ private static String getMergedConfValue(String originalValues, String newValues) {
+ if (originalValues == null) {
+ return newValues;
+ }
+ Set<String> mergedValues = new HashSet<String>();
+ mergedValues.addAll(StringUtils.getStringCollection(originalValues));
+ mergedValues.addAll(StringUtils.getStringCollection(newValues));
+ StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
+ for (String value : mergedValues) {
+ builder.append(value).append(COMMA_DELIM);
+ }
+ return builder.substring(0, builder.length() - COMMA_DELIM.length());
+ }
+
+ private static JobContext getJobContext(String alias, JobContext context) {
+ String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+ JobContext aliasContext = new JobContext(context.getConfiguration(), context.getJobID());
+ addToConfig(aliasConf, aliasContext.getConfiguration());
+ return aliasContext;
+ }
+
+ private static TaskAttemptContext getTaskContext(String alias, TaskAttemptContext context) {
+ String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+ TaskAttemptContext aliasContext = new TaskAttemptContext(context.getConfiguration(),
+ context.getTaskAttemptID());
+ addToConfig(aliasConf, aliasContext.getConfiguration());
+ return aliasContext;
+ }
+
+ private static String getAliasConfName(String alias) {
+ return MO_ALIAS + "." + alias + ".conf";
+ }
+
+ private static void addToConfig(String aliasConf, Configuration conf) {
+ String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM);
+ for (int i = 0; i < config.length; i += 2) {
+ conf.set(config[i], config[i + 1]);
+ }
+ }
+
+ /**
+ * Class that supports configuration of the job for multiple output formats.
+ */
+ public static class JobConfigurer {
+
+ private final Job job;
+ private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>();
+
+ private JobConfigurer(Job job) {
+ this.job = job;
+ }
+
+ private static JobConfigurer create(Job job) {
+ JobConfigurer configurer = new JobConfigurer(job);
+ return configurer;
+ }
+
+ /**
+ * Add a OutputFormat configuration to the Job with a alias name.
+ *
+ * @param alias the name to be given to the OutputFormat configuration
+ * @param outputFormatClass OutputFormat class
+ * @param keyClass the key class for the output data
+ * @param valueClass the value class for the output data
+ * @throws IOException
+ */
+ public void addOutputFormat(String alias,
+ Class<? extends OutputFormat> outputFormatClass,
+ Class<?> keyClass, Class<?> valueClass) throws IOException {
+ Job copy = new Job(this.job.getConfiguration());
+ outputConfigs.put(alias, copy);
+ copy.setOutputFormatClass(outputFormatClass);
+ copy.setOutputKeyClass(keyClass);
+ copy.setOutputValueClass(valueClass);
+ }
+
+ /**
+ * Get the Job configuration for a OutputFormat defined by the alias
+ * name. The job returned by this method should be passed to the
+ * OutputFormat for any configuration instead of the Job that will be
+ * submitted to the JobClient.
+ *
+ * @param alias the name used for the OutputFormat during
+ * addOutputFormat
+ * @return
+ */
+ public Job getJob(String alias) {
+ Job copy = outputConfigs.get(alias);
+ if (copy == null) {
+ throw new IllegalArgumentException("OutputFormat with alias " + alias
+ + " has not beed added");
+ }
+ return copy;
+ }
+
+ /**
+ * Configure the job with the multiple output formats added. This method
+ * should be called after all the output formats have been added and
+ * configured and before the job submission.
+ */
+ public void configure() {
+ StringBuilder aliases = new StringBuilder();
+ Configuration jobConf = job.getConfiguration();
+ for (Entry<String, Job> entry : outputConfigs.entrySet()) {
+ // Copy credentials
+ job.getCredentials().addAll(entry.getValue().getCredentials());
+ String alias = entry.getKey();
+ aliases.append(alias).append(COMMA_DELIM);
+ // Store the differing configuration for each alias in the job
+ // as a setting.
+ setAliasConf(alias, job, entry.getValue());
+ }
+ aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length());
+ jobConf.set(MO_ALIASES, aliases.toString());
+ }
+
+ }
+
+ private static class KeyValue<K, V> implements Writable {
+ private final K key;
+ private final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // Ignore. Not required as this will be never
+ // serialized/deserialized.
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ // Ignore. Not required as this will be never
+ // serialized/deserialized.
+ }
+ }
+
+ private static class MultiRecordWriter extends RecordWriter<Writable, Writable> {
+
+ private final Map<String, BaseRecordWriterContainer> baseRecordWriters;
+
+ public MultiRecordWriter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
+ String[] aliases = getOutputFormatAliases(context);
+ for (String alias : aliases) {
+ TaskAttemptContext aliasContext = getTaskContext(alias, context);
+ Configuration aliasConf = aliasContext.getConfiguration();
+ // Create output directory if not already created.
+ String outDir = aliasConf.get("mapred.output.dir");
+ if (outDir != null) {
+ Path outputDir = new Path(outDir);
+ FileSystem fs = outputDir.getFileSystem(aliasConf);
+ if (!fs.exists(outputDir)) {
+ fs.mkdirs(outputDir);
+ }
+ }
+ OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+ baseRecordWriters.put(alias,
+ new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext),
+ aliasContext));
+ }
+ }
+
+ @Override
+ public void write(Writable key, Writable value) throws IOException, InterruptedException {
+ Text _key = (Text) key;
+ KeyValue _value = (KeyValue) value;
+ String alias = new String(_key.getBytes(), 0, _key.getLength());
+ BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias);
+ if (baseRWContainer == null) {
+ throw new IllegalArgumentException("OutputFormat with alias " + alias
+ + " has not been added");
+ }
+ baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue());
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) {
+ baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
+ }
+ }
+
+ }
+
+ private static class BaseRecordWriterContainer {
+
+ private final RecordWriter recordWriter;
+ private final TaskAttemptContext context;
+
+ public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) {
+ this.recordWriter = recordWriter;
+ this.context = context;
+ }
+
+ public RecordWriter getRecordWriter() {
+ return recordWriter;
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+ private class MultiOutputCommitter extends OutputCommitter {
+
+ private final Map<String, BaseOutputCommitterContainer> outputCommitters;
+
+ public MultiOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
+ String[] aliases = getOutputFormatAliases(context);
+ for (String alias : aliases) {
+ TaskAttemptContext aliasContext = getTaskContext(alias, context);
+ OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
+ .getOutputCommitter(aliasContext);
+ outputCommitters.put(alias,
+ new BaseOutputCommitterContainer(baseCommitter, aliasContext));
+ }
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+ boolean needTaskCommit = false;
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ needTaskCommit = needTaskCommit
+ || outputContainer.getBaseCommitter().needsTaskCommit(
+ outputContainer.getContext());
+ }
+ return needTaskCommit;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().commitTask(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
+ }
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, State state) throws IOException {
+ for (String alias : outputCommitters.keySet()) {
+ BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+ outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
+ }
+ }
+ }
+
+ private static class BaseOutputCommitterContainer {
+
+ private final OutputCommitter outputCommitter;
+ private final TaskAttemptContext context;
+
+ public BaseOutputCommitterContainer(OutputCommitter outputCommitter,
+ TaskAttemptContext context) {
+ this.outputCommitter = outputCommitter;
+ this.context = context;
+ }
+
+ public OutputCommitter getBaseCommitter() {
+ return outputCommitter;
+ }
+
+ public TaskAttemptContext getContext() {
+ return context;
+ }
+ }
+
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java Tue May 1 21:59:03 2012
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatMultiOutputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHCatMultiOutputFormat.class);
+
+ private static final String DATABASE = "default";
+ private static final String[] tableNames = {"test1", "test2", "test3"};
+ private static final String[] tablePerms = {"755", "750", "700"};
+ private static Path warehousedir = null;
+ private static HashMap<String, HCatSchema> schemaMap = new HashMap<String, HCatSchema>();
+ private static HiveMetaStoreClient hmsc;
+ private static MiniMRCluster mrCluster;
+ private static Configuration mrConf;
+ private static HiveConf hiveConf;
+ private static File workDir;
+
+ private static final String msPort = "20199";
+ private static Thread t;
+
+ static {
+ schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols));
+ schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols));
+ schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols));
+ }
+
+ private static class RunMS implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "="
+ + warehousedir.toString();
+ HiveMetaStore.main(new String[] {"-v", "-p", msPort, "--hiveconf", warehouseConf});
+ } catch (Throwable t) {
+ System.err.println("Exiting. Got exception from metastore: " + t.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * Private class which holds all the data for the test cases
+ */
+ private static class ColumnHolder {
+
+ private static ArrayList<HCatFieldSchema> hCattest1Cols = new ArrayList<HCatFieldSchema>();
+ private static ArrayList<HCatFieldSchema> hCattest2Cols = new ArrayList<HCatFieldSchema>();
+ private static ArrayList<HCatFieldSchema> hCattest3Cols = new ArrayList<HCatFieldSchema>();
+
+ private static ArrayList<FieldSchema> partitionCols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test1Cols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test2Cols = new ArrayList<FieldSchema>();
+ private static ArrayList<FieldSchema> test3Cols = new ArrayList<FieldSchema>();
+
+ private static HashMap<String, List<FieldSchema>> colMapping = new HashMap<String, List<FieldSchema>>();
+
+ static {
+ try {
+ FieldSchema keyCol = new FieldSchema("key", Constants.STRING_TYPE_NAME, "");
+ test1Cols.add(keyCol);
+ test2Cols.add(keyCol);
+ test3Cols.add(keyCol);
+ hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+ FieldSchema valueCol = new FieldSchema("value", Constants.STRING_TYPE_NAME, "");
+ test1Cols.add(valueCol);
+ test3Cols.add(valueCol);
+ hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+ FieldSchema extraCol = new FieldSchema("extra", Constants.STRING_TYPE_NAME, "");
+ test3Cols.add(extraCol);
+ hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol));
+ colMapping.put("test1", test1Cols);
+ colMapping.put("test2", test2Cols);
+ colMapping.put("test3", test3Cols);
+ } catch (HCatException e) {
+ LOG.error("Error in setting up schema fields for the table", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ static {
+ partitionCols.add(new FieldSchema("ds", Constants.STRING_TYPE_NAME, ""));
+ partitionCols.add(new FieldSchema("cluster", Constants.STRING_TYPE_NAME, ""));
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ String testDir = System.getProperty("test.data.dir", "./");
+ testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/";
+ workDir = new File(new File(testDir).getCanonicalPath());
+ FileUtil.fullyDelete(workDir);
+ workDir.mkdirs();
+
+ warehousedir = new Path(workDir + "/warehouse");
+
+ // Run hive metastore server
+ t = new Thread(new RunMS());
+ t.start();
+
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ Configuration conf = new Configuration(true);
+ FileSystem fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+ fs.mkdirs(warehousedir);
+
+ initializeSetup();
+ }
+
+ private static void initializeSetup() throws Exception {
+
+ hiveConf = new HiveConf(mrConf, TestHCatMultiOutputFormat.class);
+ hiveConf.set("hive.metastore.local", "false");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+ hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString());
+ try {
+ hmsc = new HiveMetaStoreClient(hiveConf, null);
+ initalizeTables();
+ } catch (Throwable e) {
+ LOG.error("Exception encountered while setting up testcase", e);
+ throw new Exception(e);
+ } finally {
+ hmsc.close();
+ }
+ }
+
+ private static void initalizeTables() throws Exception {
+ for (String table : tableNames) {
+ try {
+ if (hmsc.getTable(DATABASE, table) != null) {
+ hmsc.dropTable(DATABASE, table);
+ }
+ } catch (NoSuchObjectException ignored) {
+ }
+ }
+ for (int i = 0; i < tableNames.length; i++) {
+ createTable(tableNames[i], tablePerms[i]);
+ }
+ }
+
+ private static void createTable(String tableName, String tablePerm) throws Exception {
+ Table tbl = new Table();
+ tbl.setDbName(DATABASE);
+ tbl.setTableName(tableName);
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(ColumnHolder.colMapping.get(tableName));
+ tbl.setSd(sd);
+ sd.setParameters(new HashMap<String, String>());
+ sd.setSerdeInfo(new SerDeInfo());
+ sd.getSerdeInfo().setName(tbl.getTableName());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+ sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+ sd.getSerdeInfo().getParameters().put(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+ sd.getSerdeInfo().setSerializationLib(
+ org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+ tbl.setPartitionKeys(ColumnHolder.partitionCols);
+
+ hmsc.createTable(tbl);
+ FileSystem fs = FileSystem.get(mrConf);
+ fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm));
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ FileUtil.fullyDelete(workDir);
+ FileSystem fs = FileSystem.get(mrConf);
+ if (fs.exists(warehousedir)) {
+ fs.delete(warehousedir, true);
+ }
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ }
+
+ /**
+ * Simple test case.
+ * <ol>
+ * <li>Submits a mapred job which writes out one fixed line to each of the tables</li>
+ * <li>uses hive fetch task to read the data and see if it matches what was written</li>
+ * </ol>
+ *
+ * @throws Exception if any error occurs
+ */
+ @Test
+ public void testOutputFormat() throws Throwable {
+ HashMap<String, String> partitionValues = new HashMap<String, String>();
+ partitionValues.put("ds", "1");
+ partitionValues.put("cluster", "ag");
+ ArrayList<OutputJobInfo> infoList = new ArrayList<OutputJobInfo>();
+ infoList.add(OutputJobInfo.create("default", tableNames[0], partitionValues));
+ infoList.add(OutputJobInfo.create("default", tableNames[1], partitionValues));
+ infoList.add(OutputJobInfo.create("default", tableNames[2], partitionValues));
+
+ Job job = new Job(hiveConf, "SampleJob");
+
+ job.setMapperClass(MyMapper.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setNumReduceTasks(0);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+ for (int i = 0; i < tableNames.length; i++) {
+ configurer.addOutputFormat(tableNames[i], HCatOutputFormat.class, BytesWritable.class,
+ HCatRecord.class);
+ HCatOutputFormat.setOutput(configurer.getJob(tableNames[i]), infoList.get(i));
+ HCatOutputFormat.setSchema(configurer.getJob(tableNames[i]),
+ schemaMap.get(tableNames[i]));
+ }
+ configurer.configure();
+
+ Path filePath = createInputFile();
+ FileInputFormat.addInputPath(job, filePath);
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ ArrayList<String> outputs = new ArrayList<String>();
+ for (String tbl : tableNames) {
+ outputs.add(getTableData(tbl, "default").get(0));
+ }
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag");
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[1] + " is not correct", outputs.get(1), "a,1,ag");
+ Assert.assertEquals("Comparing output of table " +
+ tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag");
+
+ // Check permisssion on partition dirs and files created
+ for (int i = 0; i < tableNames.length; i++) {
+ Path partitionFile = new Path(warehousedir + "/" + tableNames[i]
+ + "/ds=1/cluster=ag/part-m-00000");
+ FileSystem fs = partitionFile.getFileSystem(mrConf);
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile).getPermission(),
+ new FsPermission(tablePerms[i]));
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile.getParent()).getPermission(),
+ new FsPermission(tablePerms[i]));
+ Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+ fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(),
+ new FsPermission(tablePerms[i]));
+
+ }
+ LOG.info("File permissions verified");
+ }
+
+ /**
+ * Create a input file for map
+ *
+ * @return absolute path of the file.
+ * @throws IOException if any error encountered
+ */
+ private Path createInputFile() throws IOException {
+ Path f = new Path(workDir + "/MultiTableInput.txt");
+ FileSystem fs = FileSystem.get(mrConf);
+ if (fs.exists(f)) {
+ fs.delete(f, true);
+ }
+ OutputStream out = fs.create(f);
+ for (int i = 0; i < 3; i++) {
+ out.write("a,a\n".getBytes());
+ }
+ out.close();
+ return f;
+ }
+
+ /**
+ * Method to fetch table data
+ *
+ * @param table table name
+ * @param database database
+ * @return list of columns in comma seperated way
+ * @throws Exception if any error occurs
+ */
+ private List<String> getTableData(String table, String database) throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.addResource("hive-site.xml");
+ ArrayList<String> results = new ArrayList<String>();
+ ArrayList<String> temp = new ArrayList<String>();
+ Hive hive = Hive.get(conf);
+ org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table);
+ FetchWork work;
+ if (!tbl.getPartCols().isEmpty()) {
+ List<Partition> partitions = hive.getPartitions(tbl);
+ List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
+ List<String> partLocs = new ArrayList<String>();
+ for (Partition part : partitions) {
+ partLocs.add(part.getLocation());
+ partDesc.add(Utilities.getPartitionDesc(part));
+ }
+ work = new FetchWork(partLocs, partDesc);
+ work.setLimit(100);
+ } else {
+ work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl));
+ }
+ FetchTask task = new FetchTask();
+ task.setWork(work);
+ task.initialize(conf, null, null);
+ task.fetch(temp);
+ for (String str : temp) {
+ results.add(str.replace("\t", ","));
+ }
+ return results;
+ }
+
+ private static class MyMapper extends
+ Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+ private int i = 0;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ HCatRecord record = null;
+ String[] splits = value.toString().split(",");
+ switch (i) {
+ case 0:
+ record = new DefaultHCatRecord(2);
+ record.set(0, splits[0]);
+ record.set(1, splits[1]);
+ break;
+ case 1:
+ record = new DefaultHCatRecord(1);
+ record.set(0, splits[0]);
+ break;
+ case 2:
+ record = new DefaultHCatRecord(3);
+ record.set(0, splits[0]);
+ record.set(1, splits[1]);
+ record.set(2, "extra");
+ break;
+ default:
+ Assert.fail("This should not happen!!!!!");
+ }
+ MultiOutputFormat.write(tableNames[i], null, record, context);
+ i++;
+ }
+ }
+}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java Tue May 1 21:59:03 2012
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMultiOutputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestMultiOutputFormat.class);
+ private static File workDir;
+ private static Configuration mrConf = null;
+ private static FileSystem fs = null;
+ private static MiniMRCluster mrCluster = null;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ createWorkDir();
+ Configuration conf = new Configuration(true);
+ fs = FileSystem.get(conf);
+ System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+ // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+ // to use MiniMRCluster. MAPREDUCE-2350
+ mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+ new JobConf(conf));
+ mrConf = mrCluster.createJobConf();
+ }
+
+ private static void createWorkDir() throws IOException {
+ String testDir = System.getProperty("test.data.dir", "./");
+ testDir = testDir + "/test_multiout_" + Math.abs(new Random().nextLong()) + "/";
+ workDir = new File(new File(testDir).getCanonicalPath());
+ FileUtil.fullyDelete(workDir);
+ workDir.mkdirs();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ }
+ FileUtil.fullyDelete(workDir);
+ }
+
+ /**
+ * A test job that reads a input file and outputs each word and the index of
+ * the word encountered to a text file and sequence file with different key
+ * values.
+ */
+ @Test
+ public void testMultiOutputFormatWithoutReduce() throws Throwable {
+ Job job = new Job(mrConf, "MultiOutNoReduce");
+ job.setMapperClass(MultiOutWordIndexMapper.class);
+ job.setJarByClass(this.getClass());
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setNumReduceTasks(0);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+ configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+ IntWritable.class);
+ Path outDir = new Path(workDir.getPath(), job.getJobName());
+ FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+ FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+ String fileContent = "Hello World";
+ String inputFile = createInputFile(fileContent);
+ FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+ configurer.configure();
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ Path textOutPath = new Path(outDir, "out1/part-m-00000");
+ String[] textOutput = readFully(textOutPath).split("\n");
+ Path seqOutPath = new Path(outDir, "out2/part-m-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ String[] words = fileContent.split(" ");
+ Assert.assertEquals(words.length, textOutput.length);
+ LOG.info("Verifying file contents");
+ for (int i = 0; i < words.length; i++) {
+ Assert.assertEquals((i + 1) + "\t" + words[i], textOutput[i]);
+ reader.next(key, value);
+ Assert.assertEquals(words[i], key.toString());
+ Assert.assertEquals((i + 1), value.get());
+ }
+ Assert.assertFalse(reader.next(key, value));
+ }
+
+ /**
+ * A word count test job that reads a input file and outputs the count of
+ * words to a text file and sequence file with different key values.
+ */
+ @Test
+ public void testMultiOutputFormatWithReduce() throws Throwable {
+ Job job = new Job(mrConf, "MultiOutWithReduce");
+
+ job.setMapperClass(WordCountMapper.class);
+ job.setReducerClass(MultiOutWordCountReducer.class);
+ job.setJarByClass(this.getClass());
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(MultiOutputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+ configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+ configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+ IntWritable.class);
+ Path outDir = new Path(workDir.getPath(), job.getJobName());
+ FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+ FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+ configurer.configure();
+
+ String fileContent = "Hello World Hello World World";
+ String inputFile = createInputFile(fileContent);
+ FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ Path textOutPath = new Path(outDir, "out1/part-r-00000");
+ String[] textOutput = readFully(textOutPath).split("\n");
+ Path seqOutPath = new Path(outDir, "out2/part-r-00000");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ String[] words = "Hello World".split(" ");
+ Assert.assertEquals(words.length, textOutput.length);
+ for (int i = 0; i < words.length; i++) {
+ Assert.assertEquals((i + 2) + "\t" + words[i], textOutput[i]);
+ reader.next(key, value);
+ Assert.assertEquals(words[i], key.toString());
+ Assert.assertEquals((i + 2), value.get());
+ }
+ Assert.assertFalse(reader.next(key, value));
+
+ }
+
+
+ /**
+ * Create a file for map input
+ *
+ * @return absolute path of the file.
+ * @throws IOException if any error encountered
+ */
+ private String createInputFile(String content) throws IOException {
+ File f = File.createTempFile("input", "txt");
+ FileWriter writer = new FileWriter(f);
+ writer.write(content);
+ writer.close();
+ return f.getAbsolutePath();
+ }
+
+ private String readFully(Path file) throws IOException {
+ FSDataInputStream in = fs.open(file);
+ byte[] b = new byte[in.available()];
+ in.readFully(b);
+ in.close();
+ return new String(b);
+ }
+
+ private static class MultiOutWordIndexMapper extends
+ Mapper<LongWritable, Text, Writable, Writable> {
+
+ private IntWritable index = new IntWritable(1);
+ private Text word = new Text();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ MultiOutputFormat.write("out1", index, word, context);
+ MultiOutputFormat.write("out2", word, index, context);
+ index.set(index.get() + 1);
+ }
+ }
+ }
+
+ private static class WordCountMapper extends
+ Mapper<LongWritable, Text, Text, IntWritable> {
+
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ StringTokenizer itr = new StringTokenizer(value.toString());
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ context.write(word, one);
+ }
+ }
+ }
+
+ private static class MultiOutWordCountReducer extends
+ Reducer<Text, IntWritable, Writable, Writable> {
+
+ private IntWritable result = new IntWritable();
+
+ @Override
+ protected void reduce(Text key, Iterable<IntWritable> values, Context context)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable val : values) {
+ sum += val.get();
+ }
+ result.set(sum);
+ MultiOutputFormat.write("out1", result, key, context);
+ MultiOutputFormat.write("out2", key, result, context);
+ }
+ }
+
+}