You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by av...@apache.org on 2012/06/18 21:59:51 UTC
svn commit: r1351505 - in /incubator/hcatalog/trunk: CHANGES.txt
src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
Author: avandana
Date: Mon Jun 18 21:59:50 2012
New Revision: 1351505
URL: http://svn.apache.org/viewvc?rev=1351505&view=rev
Log:
HCAT-416 MultiOutputFormat should handle merging of DistributedCache configurations
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Jun 18 21:59:50 2012
@@ -45,6 +45,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT_416 MultiOutputFormat should handle merging of DistributedCache configurations (rohini via avandana)
+
HCAT-423 HCatalog should include SerDe-reported fields in the table schema (traviscrawford via khorgath)
HCAT-362 add --local-infile to the mysql command (arpitgupta via daijy)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Mon Jun 18 21:59:50 2012
@@ -22,14 +22,18 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -43,7 +47,8 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The MultiOutputFormat class simplifies writing output data to multiple
@@ -128,20 +133,26 @@ import org.apache.hadoop.util.StringUtil
*/
public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName());
private static final String MO_ALIASES = "mapreduce.multiout.aliases";
private static final String MO_ALIAS = "mapreduce.multiout.alias";
private static final String CONF_KEY_DELIM = "%%";
private static final String CONF_VALUE_DELIM = ";;";
private static final String COMMA_DELIM = ",";
private static final List<String> configsToOverride = new ArrayList<String>();
- private static final List<String> configsToMerge = new ArrayList<String>();
+ private static final Map<String, String> configsToMerge = new HashMap<String, String>();
static {
configsToOverride.add("mapred.output.dir");
- configsToMerge.add(JobContext.JOB_NAMENODES);
- configsToMerge.add("tmpfiles");
- configsToMerge.add("tmpjars");
- configsToMerge.add("tmparchives");
+ configsToOverride.add(DistributedCache.CACHE_SYMLINK);
+ configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM);
+ configsToMerge.put("tmpfiles", COMMA_DELIM);
+ configsToMerge.put("tmpjars", COMMA_DELIM);
+ configsToMerge.put("tmparchives", COMMA_DELIM);
+ configsToMerge.put(DistributedCache.CACHE_ARCHIVES, COMMA_DELIM);
+ configsToMerge.put(DistributedCache.CACHE_FILES, COMMA_DELIM);
+ configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator"));
+ configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator"));
}
/**
@@ -204,6 +215,7 @@ public class MultiOutputFormat extends O
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
for (String alias : getOutputFormatAliases(context)) {
+ LOGGER.debug("Calling checkOutputSpecs for alias: " + alias);
JobContext aliasContext = getJobContext(alias, context);
OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
outputFormat.checkOutputSpecs(aliasContext);
@@ -264,8 +276,8 @@ public class MultiOutputFormat extends O
String value = conf.getValue();
String jobValue = userConf.getRaw(key);
if (jobValue == null || !jobValue.equals(value)) {
- if (configsToMerge.contains(key)) {
- String mergedValue = getMergedConfValue(jobValue, value);
+ if (configsToMerge.containsKey(key)) {
+ String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key));
userConf.set(key, mergedValue);
} else {
if (configsToOverride.contains(key)) {
@@ -280,18 +292,18 @@ public class MultiOutputFormat extends O
userConf.set(getAliasConfName(alias), builder.toString());
}
- private static String getMergedConfValue(String originalValues, String newValues) {
+ private static String getMergedConfValue(String originalValues, String newValues, String separator) {
if (originalValues == null) {
return newValues;
}
- Set<String> mergedValues = new HashSet<String>();
- mergedValues.addAll(StringUtils.getStringCollection(originalValues));
- mergedValues.addAll(StringUtils.getStringCollection(newValues));
+ Set<String> mergedValues = new LinkedHashSet<String>();
+ mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator)));
+ mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator)));
StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
for (String value : mergedValues) {
- builder.append(value).append(COMMA_DELIM);
+ builder.append(value).append(separator);
}
- return builder.substring(0, builder.length() - COMMA_DELIM.length());
+ return builder.substring(0, builder.length() - separator.length());
}
private static String getAliasConfName(String alias) {
@@ -422,6 +434,7 @@ public class MultiOutputFormat extends O
baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
String[] aliases = getOutputFormatAliases(context);
for (String alias : aliases) {
+ LOGGER.info("Creating record writer for alias: " + alias);
TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
Configuration aliasConf = aliasContext.getConfiguration();
// Create output directory if not already created.
@@ -455,7 +468,9 @@ public class MultiOutputFormat extends O
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) {
+ for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) {
+ BaseRecordWriterContainer baseRWContainer = entry.getValue();
+ LOGGER.info("Closing record writer for alias: " + entry.getKey());
baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
}
}
@@ -490,6 +505,7 @@ public class MultiOutputFormat extends O
outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
String[] aliases = getOutputFormatAliases(context);
for (String alias : aliases) {
+ LOGGER.info("Creating output committer for alias: " + alias);
TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
.getOutputCommitter(aliasContext);
@@ -501,6 +517,7 @@ public class MultiOutputFormat extends O
@Override
public void setupJob(JobContext jobContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling setupJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
}
@@ -509,6 +526,7 @@ public class MultiOutputFormat extends O
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling setupTask for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
}
@@ -533,6 +551,7 @@ public class MultiOutputFormat extends O
OutputCommitter baseCommitter = outputContainer.getBaseCommitter();
TaskAttemptContext committerContext = outputContainer.getContext();
if (baseCommitter.needsTaskCommit(committerContext)) {
+ LOGGER.info("Calling commitTask for alias: " + alias);
baseCommitter.commitTask(committerContext);
}
}
@@ -541,6 +560,7 @@ public class MultiOutputFormat extends O
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling abortTask for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
}
@@ -549,6 +569,7 @@ public class MultiOutputFormat extends O
@Override
public void commitJob(JobContext jobContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling commitJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
}
@@ -557,6 +578,7 @@ public class MultiOutputFormat extends O
@Override
public void abortJob(JobContext jobContext, State state) throws IOException {
for (String alias : outputCommitters.keySet()) {
+ LOGGER.info("Calling abortJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
}
Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java Mon Jun 18 21:59:50 2012
@@ -21,10 +21,12 @@ package org.apache.hcatalog.mapreduce;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URI;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -118,7 +120,26 @@ public class TestMultiOutputFormat {
String inputFile = createInputFile(fileContent);
FileInputFormat.setInputPaths(job, new Path(inputFile));
+ //Test for merging of configs
+ DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs);
+ String dummyFile = createInputFile("dummy file");
+ DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1")
+ .getConfiguration(), fs);
+ // duplicate of the value. Merging should remove duplicates
+ DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2")
+ .getConfiguration(), fs);
+
configurer.configure();
+
+ // Verify if the configs are merged
+ Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration());
+ Assert.assertArrayEquals(new Path[] {new Path(inputFile), new Path(dummyFile)},
+ fileClassPaths);
+ URI[] expectedCacheFiles = new URI[] {new Path(inputFile).makeQualified(fs).toUri(),
+ new Path(dummyFile).makeQualified(fs).toUri()};
+ URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration());
+ Assert.assertArrayEquals(expectedCacheFiles, cacheFiles);
+
Assert.assertTrue(job.waitForCompletion(true));
Path textOutPath = new Path(outDir, "out1/part-m-00000");