You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/10/01 03:28:53 UTC

svn commit: r1392186 - in /incubator/hcatalog/branches/branch-0.4: CHANGES.txt src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java

Author: toffer
Date: Mon Oct  1 03:28:53 2012
New Revision: 1392186

URL: http://svn.apache.org/viewvc?rev=1392186&view=rev
Log:
backported for trunk: HCAT-513  Data Store onto HCatalog table fails for dynamic partitioning as the temporary directory gets deleted by the completed map tasks (amalakar via toffer)

Modified:
    incubator/hcatalog/branches/branch-0.4/CHANGES.txt
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java

Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Mon Oct  1 03:28:53 2012
@@ -65,6 +65,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-513  Data Store onto HCatalog table fails for dynamic partitioning as the temporary directory gets deleted by the completed map tasks (amalakar via toffer)
+
   HCAT-490 HCatStorer() throws error when the same partition key is present in records in more than one tasks running as part of the same job (amalakar via toffer)
 
   HCAT-494 MultiOutputFormat in 0.23 fails to setAliasConf() correctly. (mithun via toffer)

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Mon Oct  1 03:28:53 2012
@@ -35,9 +35,11 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
@@ -160,6 +162,10 @@ class FileOutputCommitterContainer exten
         try {
             if (dynamicPartitioningUsed) {
                 discoverPartitions(jobContext);
+                // Commit each partition so it gets moved out of the job work dir
+                for (JobContext context : contextDiscoveredByPath.values()) {
+                    new JobConf(context.getConfiguration()).getOutputCommitter().commitJob(context);
+                }
             }
             if (getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
                 getBaseOutputCommitter().commitJob(
@@ -475,8 +481,13 @@ class FileOutputCommitterContainer exten
                     LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
                     Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
                     partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
-                    JobContext currContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(),context.getJobID());
-                    HCatOutputFormat.configureOutputStorageHandler(context, jobInfo, fullPartSpec);
+                    JobConf jobConf = (JobConf)context.getConfiguration();
+                    JobContext currContext = HCatMapRedUtil.createJobContext(
+                        jobConf,
+                        context.getJobID(),
+                        InternalUtil.createReporter(HCatMapRedUtil.createTaskAttemptContext(jobConf,
+                            HCatHadoopShims.Instance.get().createTaskAttemptID())));
+                    HCatOutputFormat.configureOutputStorageHandler(currContext, jobInfo, fullPartSpec);
                     contextDiscoveredByPath.put(st.getPath().toString(),currContext);
                 }
             }
@@ -616,7 +627,7 @@ class FileOutputCommitterContainer exten
      * 0.9 versions. The cleanupJob method is deprecated but, Pig 0.8 and
      * 0.9 call cleanupJob method. Hence this method is used by both abortJob
      * and cleanupJob methods.
-     * @param JobContext The job context.
+     * @param context The job context.
      * @throws java.io.IOException
      */
     private void internalAbortJob(JobContext context, State state) throws IOException{

Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1392186&r1=1392185&r2=1392186&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Mon Oct  1 03:28:53 2012
@@ -146,8 +146,6 @@ class FileRecordWriterContainer extends 
                 if (baseOutputCommitter.needsTaskCommit(currContext)) {
                     baseOutputCommitter.commitTask(currContext);
                 }
-                org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currContext);
-                baseOutputCommitter.commitJob(currJobContext);
             }
         } else {
             getBaseRecordWriter().close(reporter);
@@ -205,7 +203,7 @@ class FileRecordWriterContainer extends 
                 //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition 
                 //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance
                 //In general this should be ok for most FileOutputFormat implementations
-		//but may become an issue for cases when the method is used to perform other setup tasks
+                //but may become an issue for cases when the method is used to perform other setup tasks
 
                 //setupJob()
                 baseOutputCommitter.setupJob(currJobContext);