You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by av...@apache.org on 2012/06/18 21:59:51 UTC

svn commit: r1351505 - in /incubator/hcatalog/trunk: CHANGES.txt src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java

Author: avandana
Date: Mon Jun 18 21:59:50 2012
New Revision: 1351505

URL: http://svn.apache.org/viewvc?rev=1351505&view=rev
Log:
HCAT-416 MultiOutputFormat should handle merging of DistributedCache configurations

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Jun 18 21:59:50 2012
@@ -45,6 +45,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT_416 MultiOutputFormat should handle merging of DistributedCache configurations (rohini via avandana)
+
   HCAT-423 HCatalog should include SerDe-reported fields in the table schema (traviscrawford via khorgath)
 
   HCAT-362 add --local-infile to the mysql command (arpitgupta via daijy)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Mon Jun 18 21:59:50 2012
@@ -22,14 +22,18 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -43,7 +47,8 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The MultiOutputFormat class simplifies writing output data to multiple
@@ -128,20 +133,26 @@ import org.apache.hadoop.util.StringUtil
  */
 public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName());
     private static final String MO_ALIASES = "mapreduce.multiout.aliases";
     private static final String MO_ALIAS = "mapreduce.multiout.alias";
     private static final String CONF_KEY_DELIM = "%%";
     private static final String CONF_VALUE_DELIM = ";;";
     private static final String COMMA_DELIM = ",";
     private static final List<String> configsToOverride = new ArrayList<String>();
-    private static final List<String> configsToMerge = new ArrayList<String>();
+    private static final Map<String, String> configsToMerge = new HashMap<String, String>();
 
     static {
         configsToOverride.add("mapred.output.dir");
-        configsToMerge.add(JobContext.JOB_NAMENODES);
-        configsToMerge.add("tmpfiles");
-        configsToMerge.add("tmpjars");
-        configsToMerge.add("tmparchives");
+        configsToOverride.add(DistributedCache.CACHE_SYMLINK);
+        configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM);
+        configsToMerge.put("tmpfiles", COMMA_DELIM);
+        configsToMerge.put("tmpjars", COMMA_DELIM);
+        configsToMerge.put("tmparchives", COMMA_DELIM);
+        configsToMerge.put(DistributedCache.CACHE_ARCHIVES, COMMA_DELIM);
+        configsToMerge.put(DistributedCache.CACHE_FILES, COMMA_DELIM);
+        configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator"));
+        configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator"));
     }
 
     /**
@@ -204,6 +215,7 @@ public class MultiOutputFormat extends O
     @Override
     public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
         for (String alias : getOutputFormatAliases(context)) {
+            LOGGER.debug("Calling checkOutputSpecs for alias: " + alias);
             JobContext aliasContext = getJobContext(alias, context);
             OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
             outputFormat.checkOutputSpecs(aliasContext);
@@ -264,8 +276,8 @@ public class MultiOutputFormat extends O
             String value = conf.getValue();
             String jobValue = userConf.getRaw(key);
             if (jobValue == null || !jobValue.equals(value)) {
-                if (configsToMerge.contains(key)) {
-                    String mergedValue = getMergedConfValue(jobValue, value);
+                if (configsToMerge.containsKey(key)) {
+                    String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key));
                     userConf.set(key, mergedValue);
                 } else {
                     if (configsToOverride.contains(key)) {
@@ -280,18 +292,18 @@ public class MultiOutputFormat extends O
         userConf.set(getAliasConfName(alias), builder.toString());
     }
 
-    private static String getMergedConfValue(String originalValues, String newValues) {
+    private static String getMergedConfValue(String originalValues, String newValues, String separator) {
         if (originalValues == null) {
             return newValues;
         }
-        Set<String> mergedValues = new HashSet<String>();
-        mergedValues.addAll(StringUtils.getStringCollection(originalValues));
-        mergedValues.addAll(StringUtils.getStringCollection(newValues));
+        Set<String> mergedValues = new LinkedHashSet<String>();
+        mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator)));
+        mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator)));
         StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
         for (String value : mergedValues) {
-            builder.append(value).append(COMMA_DELIM);
+            builder.append(value).append(separator);
         }
-        return builder.substring(0, builder.length() - COMMA_DELIM.length());
+        return builder.substring(0, builder.length() - separator.length());
     }
 
     private static String getAliasConfName(String alias) {
@@ -422,6 +434,7 @@ public class MultiOutputFormat extends O
             baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
             String[] aliases = getOutputFormatAliases(context);
             for (String alias : aliases) {
+                LOGGER.info("Creating record writer for alias: " + alias);
                 TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
                 Configuration aliasConf = aliasContext.getConfiguration();
                 // Create output directory if not already created.
@@ -455,7 +468,9 @@ public class MultiOutputFormat extends O
 
         @Override
         public void close(TaskAttemptContext context) throws IOException, InterruptedException {
-            for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) {
+            for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) {
+                BaseRecordWriterContainer baseRWContainer = entry.getValue();
+                LOGGER.info("Closing record writer for alias: " + entry.getKey());
                 baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
             }
         }
@@ -490,6 +505,7 @@ public class MultiOutputFormat extends O
             outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
             String[] aliases = getOutputFormatAliases(context);
             for (String alias : aliases) {
+                LOGGER.info("Creating output committer for alias: " + alias);
                 TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
                 OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
                         .getOutputCommitter(aliasContext);
@@ -501,6 +517,7 @@ public class MultiOutputFormat extends O
         @Override
         public void setupJob(JobContext jobContext) throws IOException {
             for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling setupJob for alias: " + alias);
                 BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
                 outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
             }
@@ -509,6 +526,7 @@ public class MultiOutputFormat extends O
         @Override
         public void setupTask(TaskAttemptContext taskContext) throws IOException {
             for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling setupTask for alias: " + alias);
                 BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
                 outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
             }
@@ -533,6 +551,7 @@ public class MultiOutputFormat extends O
                 OutputCommitter baseCommitter = outputContainer.getBaseCommitter();
                 TaskAttemptContext committerContext = outputContainer.getContext();
                 if (baseCommitter.needsTaskCommit(committerContext)) {
+                    LOGGER.info("Calling commitTask for alias: " + alias);
                     baseCommitter.commitTask(committerContext);
                 }
             }
@@ -541,6 +560,7 @@ public class MultiOutputFormat extends O
         @Override
         public void abortTask(TaskAttemptContext taskContext) throws IOException {
             for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling abortTask for alias: " + alias);
                 BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
                 outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
             }
@@ -549,6 +569,7 @@ public class MultiOutputFormat extends O
         @Override
         public void commitJob(JobContext jobContext) throws IOException {
             for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling commitJob for alias: " + alias);
                 BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
                 outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
             }
@@ -557,6 +578,7 @@ public class MultiOutputFormat extends O
         @Override
         public void abortJob(JobContext jobContext, State state) throws IOException {
             for (String alias : outputCommitters.keySet()) {
+                LOGGER.info("Calling abortJob for alias: " + alias);
                 BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
                 outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
             }

Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java?rev=1351505&r1=1351504&r2=1351505&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java Mon Jun 18 21:59:50 2012
@@ -21,10 +21,12 @@ package org.apache.hcatalog.mapreduce;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Random;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -118,7 +120,26 @@ public class TestMultiOutputFormat {
         String inputFile = createInputFile(fileContent);
         FileInputFormat.setInputPaths(job, new Path(inputFile));
 
+        //Test for merging of configs
+        DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs);
+        String dummyFile = createInputFile("dummy file");
+        DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1")
+                .getConfiguration(), fs);
+        // duplicate of the value. Merging should remove duplicates
+        DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2")
+                .getConfiguration(), fs);
+
         configurer.configure();
+
+        // Verify if the configs are merged
+        Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration());
+        Assert.assertArrayEquals(new Path[] {new Path(inputFile), new Path(dummyFile)},
+                fileClassPaths);
+        URI[] expectedCacheFiles = new URI[] {new Path(inputFile).makeQualified(fs).toUri(),
+                new Path(dummyFile).makeQualified(fs).toUri()};
+        URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration());
+        Assert.assertArrayEquals(expectedCacheFiles, cacheFiles);
+
         Assert.assertTrue(job.waitForCompletion(true));
 
         Path textOutPath = new Path(outDir, "out1/part-m-00000");