You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by kh...@apache.org on 2011/07/22 23:38:14 UTC

svn commit: r1149763 [1/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/har/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/test...

Author: khorgath
Date: Fri Jul 22 23:38:07 2011
New Revision: 1149763

URL: http://svn.apache.org/viewvc?rev=1149763&view=rev
Log:
HCATALOG-42 Dynamic Partitioning

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Modified:
    incubator/hcatalog/trunk/build.xml
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java

Modified: incubator/hcatalog/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/build.xml?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/build.xml (original)
+++ incubator/hcatalog/trunk/build.xml Fri Jul 22 23:38:07 2011
@@ -107,6 +107,7 @@
     <fileset dir="${hive.root}/build/ivy/lib/default" includes="jdo2-api-2.3-ec.jar"/>
     <fileset dir="${hive.root}/build/ivy/lib/default" includes="datanucleus-enhancer-2.0.3.jar"/>
     <fileset dir="${hive.root}/build/ivy/lib/default" includes="datanucleus-core-2.0.3.jar"/>
+    <fileset dir="${lib.dir}" includes="hadoop_archive-0.3.1.jar"/>
  <fileset dir="${hive.root}/lib" includes="asm-3.1.jar"/>
   </path>
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Fri Jul 22 23:38:07 2011
@@ -41,7 +41,7 @@ public enum ErrorType {
     ERROR_INVALID_PARTITION_VALUES      (2010, "Invalid partition values specified"),
     ERROR_MISSING_PARTITION_KEY         (2011, "Partition key value not provided for publish"),
     ERROR_MOVE_FAILED                   (2012, "Moving of data failed during commit"),
-
+    ERROR_TOO_MANY_DYNAMIC_PTNS         (2013, "Attempt to create too many dynamic partitions"),
 
     /* Authorization Errors 3000 - 3999 */
     ERROR_ACCESS_CONTROL           (3000, "Permission denied"),

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java Fri Jul 22 23:38:07 2011
@@ -64,13 +64,18 @@ public final class HCatConstants {
   public static final String HCAT_KEY_OUTPUT_INFO = HCAT_KEY_OUTPUT_BASE + ".info";
   public static final String HCAT_KEY_HIVE_CONF = HCAT_KEY_OUTPUT_BASE + ".hive.conf";
   public static final String HCAT_KEY_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".token.sig";
-  
+  public static final String HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.sig";
+  public static final String HCAT_KEY_JOBCLIENT_TOKEN_STRFORM = HCAT_KEY_OUTPUT_BASE + ".jobclient.token.strform";
+
   public static final String HCAT_MSG_CLEAN_FREQ = "hcat.msg.clean.freq";
   public static final String HCAT_MSG_EXPIRY_DURATION = "hcat.msg.expiry.duration";
   
   public static final String HCAT_MSGBUS_TOPIC_NAME = "hcat.msgbus.topic.name";
   public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy";
   public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix";
+  
+  public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
+  public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
 
   // Message Bus related properties.
   public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Fri Jul 22 23:38:07 2011
@@ -28,21 +28,42 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
 
 public class HCatUtil {
 
+//  static final private Log LOG = LogFactory.getLog(HCatUtil.class);
+
   public static boolean checkJobContextIfRunningFromBackend(JobContext j){
     if (j.getConfiguration().get("mapred.task.id", "").equals("")){
       return false;
@@ -256,4 +277,102 @@ public class HCatUtil {
     return true;
   }
 
+  public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(Configuration conf, String userName) throws Exception {
+//    LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
+    JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
+    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl.getDelegationToken(new Text(userName));
+//    LOG.info("got "+t);
+    return t;
+    
+//    return null;
+  }
+
+  public static void cancelJobTrackerDelegationToken(String tokenStrForm, String tokenSignature) throws Exception {
+//    LOG.info("cancelJobTrackerDelegationToken("+tokenStrForm+","+tokenSignature+")");
+    JobClient jcl = new JobClient(new JobConf(new Configuration(), HCatOutputFormat.class));
+    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = extractJobTrackerToken(tokenStrForm,tokenSignature);
+//    LOG.info("canceling "+t);
+    try {
+      jcl.cancelDelegationToken(t);
+    }catch(Exception e){
+//      HCatUtil.logToken(LOG, "jcl token to cancel", t);
+      // ignore if token has already been invalidated.
+    }
+  }
+  
+  
+  public static Token<? extends AbstractDelegationTokenIdentifier> 
+      extractThriftToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
+//    LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
+    Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    t.setService(new Text(tokenSignature));
+//    LOG.info("returning "+t);
+    return t;
+  }
+
+  public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> 
+      extractJobTrackerToken(String tokenStrForm, String tokenSignature) throws MetaException, TException, IOException {
+//    LOG.info("extractJobTrackerToken("+tokenStrForm+","+tokenSignature+")");
+    Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = 
+        new Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier>();
+    t.decodeFromUrlString(tokenStrForm);
+    t.setService(new Text(tokenSignature));
+//    LOG.info("returning "+t);
+    return t;
+  }
+
+  /**
+   * Logging stack trace
+   * @param logger
+   */
+  public static void logStackTrace(Log logger) {
+    StackTraceElement[] stackTrace = new Exception().getStackTrace();
+    for (int i = 1 ; i < stackTrace.length ; i++){
+      logger.info("\t"+stackTrace[i].toString());
+    }
+  }
+
+  /**
+   * debug log the hive conf
+   * @param logger
+   * @param hc
+   */
+  public static void logHiveConf(Log logger, HiveConf hc){
+    logEntrySet(logger,"logging hiveconf:",hc.getAllProperties().entrySet());
+  }
+
+  
+  public static void logList(Log logger, String itemName, List<? extends Object> list){
+      logger.info(itemName+":");
+      for (Object item : list){
+          logger.info("\t["+item+"]");
+      }
+  }
+  
+  public static void logMap(Log logger, String itemName, Map<? extends Object,? extends Object> map){
+    logEntrySet(logger,itemName,map.entrySet());
+  }
+
+  public static void logEntrySet(Log logger, String itemName, Set<? extends Entry> entrySet) {
+    logger.info(itemName+":");
+    for (Entry e : entrySet){
+      logger.info("\t["+e.getKey()+"]=>["+e.getValue()+"]");
+    }
+  }
+
+  public static void logAllTokens(Log logger, JobContext context) throws IOException {
+    for (Token<? extends TokenIdentifier>t : context.getCredentials().getAllTokens()){
+      logToken(logger,"token",t);
+    }
+  }
+
+  public static void logToken(Log logger, String itemName, Token<? extends TokenIdentifier> t) throws IOException {
+    logger.info(itemName+":");
+    logger.info("\tencodeToUrlString : "+t.encodeToUrlString());
+    logger.info("\ttoString : "+t.toString());
+    logger.info("\tkind : "+t.getKind());
+    logger.info("\tservice : "+t.getService());
+  }
+  
 }

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java?rev=1149763&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/har/HarOutputCommitterPostProcessor.java Fri Jul 22 23:38:07 2011
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.har;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.tools.HadoopArchives;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+
+public class HarOutputCommitterPostProcessor {
+  
+//  static final private Log LOG = LogFactory.getLog(HarOutputCommitterPostProcessor.class);
+
+  boolean isEnabled = false;
+  
+  public boolean isEnabled() {
+    return isEnabled;
+  }
+
+  public void setEnabled(boolean enabled) {
+    this.isEnabled = enabled;
+  }
+
+
+  public void exec(JobContext context, Partition partition, Path partPath) throws IOException {
+//    LOG.info("Archiving partition ["+partPath.toString()+"]");
+    makeHar(context,partPath.toUri().toString(),harFile(partPath));
+    partition.getParameters().put(Constants.IS_ARCHIVED, "true");
+  }
+  
+  public String harFile(Path ptnPath) throws IOException{
+    String harFile = ptnPath.toString().replaceFirst("/+$", "") + ".har";
+//    LOG.info("har file : " + harFile);
+    return harFile;
+  }
+
+  public String getParentFSPath(Path ptnPath) throws IOException {
+    return ptnPath.toUri().getPath().replaceFirst("/+$", "");
+  }
+
+  public String getProcessedLocation(Path ptnPath) throws IOException {
+    String harLocn = ("har://" + ptnPath.toUri().getPath()).replaceFirst("/+$", "") + ".har" + Path.SEPARATOR;
+//    LOG.info("har location : " + harLocn);
+    return harLocn;
+  }
+  
+
+  /**
+   * Creates a har file from the contents of a given directory, using that as root.
+   * @param dir Directory to archive
+   * @param harName The HAR file to create
+   */
+  public static void makeHar(JobContext context, String dir, String harFile) throws IOException{
+//    Configuration conf = context.getConfiguration();
+//    Credentials creds = context.getCredentials();
+    
+//    HCatUtil.logAllTokens(LOG,context);
+    
+    int lastSep = harFile.lastIndexOf(Path.SEPARATOR_CHAR);
+    Path archivePath = new Path(harFile.substring(0,lastSep));
+    final String[] args = {
+        "-archiveName",
+        harFile.substring(lastSep+1, harFile.length()),
+        "-p",
+        dir,
+        "*",
+        archivePath.toString()
+    };
+//    for (String arg : args){
+//      LOG.info("Args to har : "+ arg);
+//    }
+    try {
+      Configuration newConf = new Configuration();
+      FileSystem fs = archivePath.getFileSystem(newConf);
+      
+      newConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+//      LOG.info("System.getenv(\"HADOOP_TOKEN_FILE_LOCATION\") =["+  System.getenv("HADOOP_TOKEN_FILE_LOCATION")+"]");
+
+//      for (FileStatus ds : fs.globStatus(new Path(dir, "*"))){
+//        LOG.info("src : "+ds.getPath().toUri().toString());
+//      }
+
+      final HadoopArchives har = new HadoopArchives(newConf);
+      int rc = ToolRunner.run(har, args);
+      if (rc!= 0){
+        throw new Exception("Har returned error code "+rc);
+      }
+
+//      for (FileStatus hs : fs.globStatus(new Path(harFile, "*"))){
+//        LOG.info("dest : "+hs.getPath().toUri().toString());
+//      }
+//      doHarCheck(fs,harFile);
+//      LOG.info("Nuking " + dir);
+      fs.delete(new Path(dir), true);
+    } catch (Exception e){
+      throw new HCatException("Error creating Har ["+harFile+"] from ["+dir+"]", e);
+    }
+  }
+
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -34,7 +34,7 @@ public abstract class HCatBaseOutputComm
   /** The underlying output committer */
   protected final OutputCommitter baseCommitter;
 
-  public HCatBaseOutputCommitter(OutputCommitter baseCommitter) {
+  public HCatBaseOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
     this.baseCommitter = baseCommitter;
   }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -20,9 +20,14 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -36,6 +41,8 @@ import org.apache.hcatalog.data.schema.H
 
 public abstract class HCatBaseOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
 
+//  static final private Log LOG = LogFactory.getLog(HCatBaseOutputFormat.class);
+
   /**
    * Gets the table schema for the table specified in the HCatOutputFormat.setOutput call
    * on the specified job context.
@@ -83,7 +90,7 @@ public abstract class HCatBaseOutputForm
    * @return the OutputJobInfo object
    * @throws IOException the IO exception
    */
-  static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+  public static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
       String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
       if( jobString == null ) {
           throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
@@ -102,33 +109,107 @@ public abstract class HCatBaseOutputForm
   @SuppressWarnings("unchecked")
   static HCatOutputStorageDriver getOutputDriverInstance(
           JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
+    return getOutputDriverInstance(jobContext,jobInfo,(List<String>)null);
+  }
+
+  /**
+   * Gets the output storage driver instance, with allowing specification of missing dynamic partvals
+   * @param jobContext the job context
+   * @param jobInfo the output job info
+   * @return the output driver instance
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  static HCatOutputStorageDriver getOutputDriverInstance(
+          JobContext jobContext, OutputJobInfo jobInfo, List<String> dynamicPartVals) throws IOException {
       try {
           Class<? extends HCatOutputStorageDriver> driverClass =
               (Class<? extends HCatOutputStorageDriver>)
               Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
           HCatOutputStorageDriver driver = driverClass.newInstance();
 
+          Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
+          String location = jobInfo.getLocation();
+
+          if (dynamicPartVals != null){
+            // dynamic part vals specified
+            List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
+            if (dynamicPartVals.size() != dynamicPartKeys.size()){
+              throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
+                  "Unable to instantiate dynamic partitioning storage driver, mismatch between"
+                  + " number of partition values obtained["+dynamicPartVals.size()
+                  + "] and number of partition values required["+dynamicPartKeys.size()+"]");
+            }
+            for (int i = 0; i < dynamicPartKeys.size(); i++){
+              partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i));
+            }
+
+            // re-home location, now that we know the rest of the partvals
+            Table table = jobInfo.getTable();
+            
+            List<String> partitionCols = new ArrayList<String>();
+            for(FieldSchema schema : table.getPartitionKeys()) {
+              partitionCols.add(schema.getName());
+            }
+
+            location = driver.getOutputLocation(jobContext,
+                table.getSd().getLocation() , partitionCols,
+                partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+          }
+
           //Initialize the storage driver
           driver.setSchema(jobContext, jobInfo.getOutputSchema());
-          driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
-          driver.setOutputPath(jobContext, jobInfo.getLocation());
+          driver.setPartitionValues(jobContext, partitionValues);
+          driver.setOutputPath(jobContext, location);
+          
+//          HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
 
           driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
 
           return driver;
       } catch(Exception e) {
+        if (e instanceof HCatException){
+          throw (HCatException)e;
+        }else{
           throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
+        }
       }
   }
 
+  /**
+   * Gets the output storage driver instance, with allowing specification 
+   * of partvals from which it picks the dynamic partvals
+   * @param jobContext the job context
+   * @param jobInfo the output job info
+   * @return the output driver instance
+   * @throws IOException
+   */
+
+  protected static HCatOutputStorageDriver getOutputDriverInstance(
+      JobContext context, OutputJobInfo jobInfo,
+      Map<String, String> fullPartSpec) throws IOException {
+    List<String> dynamicPartKeys = jobInfo.getTableInfo().getDynamicPartitioningKeys();
+    if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
+      return getOutputDriverInstance(context,jobInfo,(List<String>)null);
+    }else{
+      List<String> dynKeyVals = new ArrayList<String>();
+      for (String dynamicPartKey : dynamicPartKeys){
+        dynKeyVals.add(fullPartSpec.get(dynamicPartKey));
+      }
+      return getOutputDriverInstance(context,jobInfo,dynKeyVals);
+    }
+  }
+
+
   protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema,
       Map<String, String> partMap) throws HCatException, IOException {
     List<Integer> posOfPartCols = new ArrayList<Integer>();
+    List<Integer> posOfDynPartCols = new ArrayList<Integer>();
 
     // If partition columns occur in data, we want to remove them.
     // So, find out positions of partition columns in schema provided by user.
     // We also need to update the output Schema with these deletions.
-
+    
     // Note that, output storage drivers never sees partition columns in data
     // or schema.
 
@@ -140,8 +221,26 @@ public abstract class HCatBaseOutputForm
         schemaWithoutParts.remove(schema.get(partKey));
       }
     }
+
+    // Also, if dynamic partitioning is being used, we want to
+    // set appropriate list of columns for the columns to be dynamically specified.
+    // These would be partition keys too, so would also need to be removed from 
+    // output schema and partcols
+
+    if (jobInfo.getTableInfo().isDynamicPartitioningUsed()){
+      for (String partKey : jobInfo.getTableInfo().getDynamicPartitioningKeys()){
+        Integer idx;
+        if((idx = schema.getPosition(partKey)) != null){
+          posOfPartCols.add(idx);
+          posOfDynPartCols.add(idx);
+          schemaWithoutParts.remove(schema.get(partKey));
+        }
+      }
+    }
+    
     HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
     jobInfo.setPosOfPartCols(posOfPartCols);
+    jobInfo.setPosOfDynPartCols(posOfDynPartCols);
     jobInfo.setOutputSchema(schemaWithoutParts);
   }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.E
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
 
@@ -47,8 +48,8 @@ public class HCatEximOutputCommitter ext
 
   private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
 
-  public HCatEximOutputCommitter(OutputCommitter baseCommitter) {
-    super(baseCommitter);
+  public HCatEximOutputCommitter(JobContext context, OutputCommitter baseCommitter) {
+    super(context,baseCommitter);
   }
 
   @Override

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -91,7 +91,7 @@ public class HCatEximOutputFormat extend
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
       OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
-      return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context));
+      return new HCatEximOutputCommitter(context,outputFormat.getOutputCommitter(context));
   }
 
   public static void setOutput(Job job, String dbname, String tablename, String location,

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputCommitter.java Fri Jul 22 23:38:07 2011
@@ -21,26 +21,35 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Constants;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
@@ -49,49 +58,105 @@ import org.apache.hcatalog.common.HCatUt
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
 import org.apache.thrift.TException;
 
 public class HCatOutputCommitter extends OutputCommitter {
 
+//    static final private Log LOG = LogFactory.getLog(HCatOutputCommitter.class);
+
     /** The underlying output committer */
     private final OutputCommitter baseCommitter;
 
-    public HCatOutputCommitter(OutputCommitter baseCommitter) {
+    private final boolean dynamicPartitioningUsed;
+    private boolean partitionsDiscovered;
+
+    private Map<String, Map<String, String>> partitionsDiscoveredByPath;
+    private Map<String, HCatOutputStorageDriver> storageDriversDiscoveredByPath;
+    
+    HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor();
+
+    private String ptnRootLocation = null;
+
+    public HCatOutputCommitter(JobContext context, OutputCommitter baseCommitter) throws IOException {
+      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+      dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
+      if (!dynamicPartitioningUsed){
         this.baseCommitter = baseCommitter;
+        this.partitionsDiscovered = true;
+      }else{
+        this.baseCommitter = null;
+        this.partitionsDiscovered = false;
+      }
     }
 
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {
+      if (!dynamicPartitioningUsed){
         baseCommitter.abortTask(context);
+      }
     }
 
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
+      if (!dynamicPartitioningUsed){
         baseCommitter.commitTask(context);
+      }else{
+        // called explicitly through HCatRecordWriter.close() if dynamic
+      }
     }
 
     @Override
     public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+      if (!dynamicPartitioningUsed){
         return baseCommitter.needsTaskCommit(context);
+      }else{
+        // called explicitly through HCatRecordWriter.close() if dynamic - return false by default
+        return false;
+      }
     }
 
     @Override
     public void setupJob(JobContext context) throws IOException {
-      if( baseCommitter != null ) {
-        baseCommitter.setupJob(context);
-      }
+        if( baseCommitter != null ) {
+          baseCommitter.setupJob(context);
+        }
+        // in dynamic usecase, called through HCatRecordWriter 
     }
 
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
+      if (!dynamicPartitioningUsed){
         baseCommitter.setupTask(context);
-    }
+      }else{
+        // called explicitly through HCatRecordWriter.write() if dynamic
+      }
+    } 
 
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
+      
+      if (dynamicPartitioningUsed){
+        discoverPartitions(jobContext);
+      }
+
       if(baseCommitter != null) {
         baseCommitter.abortJob(jobContext, state);
+      }else{
+        if (dynamicPartitioningUsed){
+          for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+            try {
+              baseOsd.abortOutputCommitterJob(
+                  new TaskAttemptContext(
+                      jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+                      ),state);
+            } catch (Exception e) {
+              throw new IOException(e);
+            }
+          }
+        }
       }
+
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
 
       try {
@@ -106,6 +171,13 @@ public class HCatOutputCommitter extends
             (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
           client.cancelDelegationToken(tokenStrForm);
         }
+        
+        String jcTokenStrForm = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+        String jcTokenSignature = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+        if(jcTokenStrForm != null && jcTokenSignature != null) {
+          HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+        }
+        
       } catch(Exception e) {
         if( e instanceof HCatException ) {
           throw (HCatException) e;
@@ -114,8 +186,16 @@ public class HCatOutputCommitter extends
         }
       }
 
-      Path src = new Path(jobInfo.getLocation());
+      Path src; 
+      if (dynamicPartitioningUsed){
+        src = new Path(getPartitionRootLocation(
+            jobInfo.getLocation().toString(),jobInfo.getTable().getPartitionKeysSize()
+            ));
+      }else{
+        src = new Path(jobInfo.getLocation());
+      }
       FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+//      LOG.warn("abortJob about to delete ["+src.toString() +"]");
       fs.delete(src, true);
     }
 
@@ -130,6 +210,10 @@ public class HCatOutputCommitter extends
 
     @Override
     public void commitJob(JobContext jobContext) throws IOException {
+      if (dynamicPartitioningUsed){
+        discoverPartitions(jobContext);
+      }
+
       if(baseCommitter != null) {
         baseCommitter.commitJob(jobContext);
       }
@@ -153,12 +237,15 @@ public class HCatOutputCommitter extends
 
     @Override
     public void cleanupJob(JobContext context) throws IOException {
+      if (dynamicPartitioningUsed){
+        discoverPartitions(context);
+      }
+
 
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
       Configuration conf = context.getConfiguration();
       Table table = jobInfo.getTable();
-      StorageDescriptor tblSD = table.getSd();
-      Path tblPath = new Path(tblSD.getLocation());
+      Path tblPath = new Path(table.getSd().getLocation());
       FileSystem fs = tblPath.getFileSystem(conf);
 
       if( table.getPartitionKeys().size() == 0 ) {
@@ -166,75 +253,116 @@ public class HCatOutputCommitter extends
 
         if( baseCommitter != null ) {
           baseCommitter.cleanupJob(context);
+        }else{
+          if (dynamicPartitioningUsed){
+            for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+              try {
+                baseOsd.cleanupOutputCommitterJob(
+                    new TaskAttemptContext(
+                        context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
+                        ));
+              } catch (Exception e) {
+                throw new IOException(e);
+              }
+            }
+          }
         }
-
+        
         //Move data from temp directory the actual table directory
         //No metastore operation required.
         Path src = new Path(jobInfo.getLocation());
-        moveTaskOutputs(fs, src, src, tblPath);
+        moveTaskOutputs(fs, src, src, tblPath,false);
         fs.delete(src, true);
         return;
       }
 
       HiveMetaStoreClient client = null;
       List<String> values = null;
-      boolean partitionAdded = false;
       HCatTableInfo tableInfo = jobInfo.getTableInfo();
 
+      List<Partition> partitionsAdded = new ArrayList<Partition>();
+
       try {
         client = HCatOutputFormat.createHiveClient(tableInfo.getServerUri(), conf);
 
         StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
 
-        Partition partition = new Partition();
-        partition.setDbName(tableInfo.getDatabaseName());
-        partition.setTableName(tableInfo.getTableName());
-        partition.setSd(new StorageDescriptor(tblSD));
-        partition.getSd().setLocation(jobInfo.getLocation());
-
         updateTableSchema(client, table, jobInfo.getOutputSchema());
+        
+        FileStatus tblStat = fs.getFileStatus(tblPath);
+        String grpName = tblStat.getGroup();
+        FsPermission perms = tblStat.getPermission();
 
-        List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        for(HCatFieldSchema fieldSchema : jobInfo.getOutputSchema().getFields()) {
-          fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+        List<Partition> partitionsToAdd = new ArrayList<Partition>();
+        if (!dynamicPartitioningUsed){
+          partitionsToAdd.add(
+              constructPartition(
+                  context,
+                  tblPath.toString(), tableInfo.getPartitionValues()
+                  ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                  ,table, fs
+                  ,grpName,perms));
+        }else{
+          for (Entry<String,Map<String,String>> entry : partitionsDiscoveredByPath.entrySet()){
+            partitionsToAdd.add(
+                constructPartition(
+                    context,
+                    getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+                    ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
+                    ,table, fs
+                    ,grpName,perms));
+          }
         }
 
-        partition.getSd().setCols(fields);
-
-        Map<String,String> partKVs = tableInfo.getPartitionValues();
-        //Get partition value list
-        partition.setValues(getPartitionValueList(table,partKVs));
-
-        Map<String, String> params = new HashMap<String, String>();
-        params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
-        params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+        //Publish the new partition(s)
+        if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
+          
+          Path src = new Path(ptnRootLocation);
+
+          // check here for each dir we're copying out, to see if it already exists, error out if so
+          moveTaskOutputs(fs, src, src, tblPath,true);
+          
+          moveTaskOutputs(fs, src, src, tblPath,false);
+          fs.delete(src, true);
+          
+          
+//          for (Partition partition : partitionsToAdd){
+//            partitionsAdded.add(client.add_partition(partition));
+//            // currently following add_partition instead of add_partitions because latter isn't 
+//            // all-or-nothing and we want to be able to roll back partitions we added if need be.
+//          }
 
-        //Copy table level hcat.* keys to the partition
-        for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
-          params.put(entry.getKey().toString(), entry.getValue().toString());
-        }
+          try {
+            client.add_partitions(partitionsToAdd);
+            partitionsAdded = partitionsToAdd;
+          } catch (Exception e){
+            // There was an error adding partitions : rollback fs copy and rethrow
+            for (Partition p : partitionsToAdd){
+              Path ptnPath = new Path(harProcessor.getParentFSPath(new Path(p.getSd().getLocation())));
+              if (fs.exists(ptnPath)){
+                fs.delete(ptnPath,true);
+              }
+            }
+            throw e;
+          }
 
-        partition.setParameters(params);
+        }else{
+          // no harProcessor, regular operation
 
-        // Sets permissions and group name on partition dirs.
-        FileStatus tblStat = fs.getFileStatus(tblPath);
-        String grpName = tblStat.getGroup();
-        FsPermission perms = tblStat.getPermission();
-        Path partPath = tblPath;
-        for(FieldSchema partKey : table.getPartitionKeys()){
-          partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
-          fs.setPermission(partPath, perms);
-          try{
-            fs.setOwner(partPath, null, grpName);
-          } catch(AccessControlException ace){
-            // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+          // No duplicate partition publish case to worry about because we'll 
+          // get a AlreadyExistsException here if so, and appropriately rollback
+          
+          client.add_partitions(partitionsToAdd);
+          partitionsAdded = partitionsToAdd;
+
+          if (dynamicPartitioningUsed && (partitionsAdded.size()>0)){
+            Path src = new Path(ptnRootLocation);
+            moveTaskOutputs(fs, src, src, tblPath,false);
+            fs.delete(src, true);
           }
+          
         }
-
-        //Publish the new partition
-        client.add_partition(partition);
-        partitionAdded = true; //publish to metastore done
-
+        
         if( baseCommitter != null ) {
           baseCommitter.cleanupJob(context);
         }
@@ -247,13 +375,24 @@ public class HCatOutputCommitter extends
             (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
           client.cancelDelegationToken(tokenStrForm);
         }
+
+        String jcTokenStrForm = 
+            context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
+        String jcTokenSignature = 
+            context.getConfiguration().get(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
+        if(jcTokenStrForm != null && jcTokenSignature != null) {
+          HCatUtil.cancelJobTrackerDelegationToken(tokenStrForm,jcTokenSignature);
+        }
+
       } catch (Exception e) {
 
-        if( partitionAdded ) {
+        if( partitionsAdded.size() > 0 ) {
           try {
             //baseCommitter.cleanupJob failed, try to clean up the metastore
+            for (Partition p : partitionsAdded){
             client.dropPartition(tableInfo.getDatabaseName(),
-                    tableInfo.getTableName(), values);
+                    tableInfo.getTableName(), p.getValues());
+            }
           } catch(Exception te) {
             //Keep cause as the original exception
             throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
@@ -272,6 +411,114 @@ public class HCatOutputCommitter extends
       }
     }
 
+    private String getPartitionRootLocation(String ptnLocn,int numPtnKeys) {
+      if (ptnRootLocation  == null){
+        // we only need to calculate it once, it'll be the same for other partitions in this job.
+        Path ptnRoot = new Path(ptnLocn);
+        for (int i = 0; i < numPtnKeys; i++){
+//          LOG.info("Getting parent of "+ptnRoot.getName());
+          ptnRoot = ptnRoot.getParent();
+        }
+        ptnRootLocation = ptnRoot.toString();
+      }
+//      LOG.info("Returning final parent : "+ptnRootLocation);
+      return ptnRootLocation;
+    }
+
+    /**
+     * Generate partition metadata object to be used to add to metadata.
+     * @param partLocnRoot The table-equivalent location root of the partition
+     *                       (temporary dir if dynamic partition, table dir if static)
+     * @param partKVs The keyvalue pairs that form the partition
+     * @param outputSchema The output schema for the partition
+     * @param params The parameters to store inside the partition
+     * @param table The Table metadata object under which this Partition will reside
+     * @param fs FileSystem object to operate on the underlying filesystem
+     * @param grpName Group name that owns the table dir
+     * @param perms FsPermission that's the default permission of the table dir.
+     * @return Constructed Partition metadata object
+     * @throws IOException
+     */
+    
+    private Partition constructPartition(
+        JobContext context,
+        String partLocnRoot, Map<String,String> partKVs,
+        HCatSchema outputSchema, Map<String, String> params, 
+        Table table, FileSystem fs,
+        String grpName, FsPermission perms) throws IOException {
+
+      StorageDescriptor tblSD = table.getSd();
+      
+      Partition partition = new Partition();
+      partition.setDbName(table.getDbName());
+      partition.setTableName(table.getTableName());
+      partition.setSd(new StorageDescriptor(tblSD));
+
+      List<FieldSchema> fields = new ArrayList<FieldSchema>();
+      for(HCatFieldSchema fieldSchema : outputSchema.getFields()) {
+        fields.add(HCatSchemaUtils.getFieldSchema(fieldSchema));
+      }
+
+      partition.getSd().setCols(fields);
+
+      partition.setValues(getPartitionValueList(table,partKVs));
+
+      partition.setParameters(params);
+
+      // Sets permissions and group name on partition dirs.
+
+      Path partPath = new Path(partLocnRoot);
+      for(FieldSchema partKey : table.getPartitionKeys()){
+        partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+//        LOG.info("Setting perms for "+partPath.toString());
+        fs.setPermission(partPath, perms);
+        try{
+          fs.setOwner(partPath, null, grpName);
+        } catch(AccessControlException ace){
+          // log the messages before ignoring. Currently, logging is not built in Hcatalog.
+//          LOG.warn(ace);
+        }
+      }
+      if (dynamicPartitioningUsed){
+        String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
+        if (harProcessor.isEnabled()){
+          harProcessor.exec(context, partition, partPath);
+          partition.getSd().setLocation(
+              harProcessor.getProcessedLocation(new Path(dynamicPartitionDestination)));
+        }else{
+          partition.getSd().setLocation(dynamicPartitionDestination);
+        }
+      }else{
+        partition.getSd().setLocation(partPath.toString());
+      }
+
+      return partition;
+    }
+
+
+
+    private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
+      // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA  ->
+      // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+      Path partPath = new Path(table.getSd().getLocation());
+      for(FieldSchema partKey : table.getPartitionKeys()){
+        partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+      }
+      return partPath.toString();
+    }
+
+    private Map<String, String> getStorerParameterMap(StorerInfo storer) {
+      Map<String, String> params = new HashMap<String, String>();
+      params.put(HCatConstants.HCAT_ISD_CLASS, storer.getInputSDClass());
+      params.put(HCatConstants.HCAT_OSD_CLASS, storer.getOutputSDClass());
+
+      //Copy table level hcat.* keys to the partition
+      for(Map.Entry<Object, Object> entry : storer.getProperties().entrySet()) {
+        params.put(entry.getKey().toString(), entry.getValue().toString());
+      }
+      return params;
+    }
+
     private Path constructPartialPartPath(Path partialPath, String partKey, Map<String,String> partKVs){
 
       StringBuilder sb = new StringBuilder(FileUtils.escapePathName(partKey));
@@ -344,31 +591,42 @@ public class HCatOutputCommitter extends
      * @param file the file to move
      * @param src the source directory
      * @param dest the target directory
+     * @param dryRun - a flag that simply tests if this move would succeed or not based 
+     *                 on whether other files exist where we're trying to copy
      * @throws IOException
      */
     private void moveTaskOutputs(FileSystem fs,
                                  Path file,
                                  Path src,
-                                 Path dest) throws IOException {
+                                 Path dest, boolean dryRun) throws IOException {
       if (fs.isFile(file)) {
         Path finalOutputPath = getFinalPath(file, src, dest);
 
-        if (!fs.rename(file, finalOutputPath)) {
-          if (!fs.delete(finalOutputPath, true)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+        if (dryRun){
+//        LOG.info("Testing if moving ["+file+"] to ["+finalOutputPath+"] would cause a problem");
+          if (fs.exists(finalOutputPath)){
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + ", duplicate publish possible.");
           }
+        }else{
+//        LOG.info("Moving ["+file+"] to ["+finalOutputPath+"]");
           if (!fs.rename(file, finalOutputPath)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+            if (!fs.delete(finalOutputPath, true)) {
+              throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath);
+            }
+            if (!fs.rename(file, finalOutputPath)) {
+              throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + dest);
+            }
           }
         }
       } else if(fs.getFileStatus(file).isDir()) {
         FileStatus[] paths = fs.listStatus(file);
         Path finalOutputPath = getFinalPath(file, src, dest);
-        fs.mkdirs(finalOutputPath);
-
+        if (!dryRun){
+          fs.mkdirs(finalOutputPath);
+        }
         if (paths != null) {
           for (FileStatus path : paths) {
-            moveTaskOutputs(fs, path.getPath(), src, dest);
+            moveTaskOutputs(fs, path.getPath(), src, dest,dryRun);
           }
         }
       }
@@ -398,4 +656,72 @@ public class HCatOutputCommitter extends
       }
     }
 
+    /**
+     * Run to discover dynamic partitions available
+     */
+    private void discoverPartitions(JobContext context) throws IOException {
+      if (!partitionsDiscovered){
+        //      LOG.info("discover ptns called");
+
+        OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+
+        harProcessor.setEnabled(jobInfo.getHarRequested());
+
+        List<Integer> dynamicPartCols = jobInfo.getPosOfDynPartCols();
+        int maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
+
+        Path loadPath = new Path(jobInfo.getLocation());
+        FileSystem fs = loadPath.getFileSystem(context.getConfiguration());
+
+        // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+
+        String dynPathSpec = loadPath.toUri().getPath();
+        dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*");
+        // TODO : replace this with a param pull from HiveConf
+
+        //      LOG.info("Searching for "+dynPathSpec);
+        Path pathPattern = new Path(loadPath, dynPathSpec);
+        FileStatus[] status = fs.globStatus(pathPattern);
+
+        partitionsDiscoveredByPath = new LinkedHashMap<String,Map<String, String>>();
+        storageDriversDiscoveredByPath = new LinkedHashMap<String,HCatOutputStorageDriver>();
+
+
+        if (status.length == 0) {
+          //        LOG.warn("No partition found genereated by dynamic partitioning in ["
+          //            +loadPath+"] with depth["+jobInfo.getTable().getPartitionKeysSize()
+          //            +"], dynSpec["+dynPathSpec+"]");
+        }else{
+          if ((maxDynamicPartitions != -1) && (status.length > maxDynamicPartitions)){
+            this.partitionsDiscovered = true;
+            throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, 
+                "Number of dynamic partitions being created "
+                    + "exceeds configured max allowable partitions["
+                    + maxDynamicPartitions 
+                    + "], increase parameter [" 
+                    + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                    + "] if needed.");
+          }
+
+          for (FileStatus st : status){
+            LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
+            Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+            partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
+            storageDriversDiscoveredByPath.put(st.getPath().toString(), 
+                HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec));
+          }
+        }
+
+        //      for (Entry<String,Map<String,String>> spec : partitionsDiscoveredByPath.entrySet()){
+        //        LOG.info("Partition "+ spec.getKey());
+        //        for (Entry<String,String> e : spec.getValue().entrySet()){
+        //          LOG.info(e.getKey() + "=>" +e.getValue());
+        //        }
+        //      }
+
+        this.partitionsDiscovered = true;
+      }
+    }
+
+
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Fri Jul 22 23:38:07 2011
@@ -21,11 +21,14 @@ package org.apache.hcatalog.mapreduce;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Map.Entry;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +57,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -66,9 +70,15 @@ import org.apache.thrift.TException;
  * and should be given as null. The value is the HCatRecord to write.*/
 public class HCatOutputFormat extends HCatBaseOutputFormat {
 
+//    static final private Log LOG = LogFactory.getLog(HCatOutputFormat.class);
+
     /** The directory under which data is initially written for a non partitioned table */
     protected static final String TEMP_DIR_NAME = "_TEMP";
-    private static Map<String, Token<DelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<DelegationTokenIdentifier>>();
+    
+    /** */
+    protected static final String DYNTEMP_DIR_NAME = "_DYN";
+    
+    private static Map<String, Token<? extends AbstractDelegationTokenIdentifier>> tokenMap = new HashMap<String, Token<? extends AbstractDelegationTokenIdentifier>>();
 
     private static final PathFilter hiddenFileFilter = new PathFilter(){
       public boolean accept(Path p){
@@ -76,6 +86,9 @@ public class HCatOutputFormat extends HC
         return !name.startsWith("_") && !name.startsWith(".");
       }
     };
+    
+    private static int maxDynamicPartitions;
+    private static boolean harRequested;
 
     /**
      * Set the info about the output to write for the Job. This queries the metadata server
@@ -90,17 +103,58 @@ public class HCatOutputFormat extends HC
 
       try {
 
-	Configuration conf = job.getConfiguration();
+        Configuration conf = job.getConfiguration();
         client = createHiveClient(outputInfo.getServerUri(), conf);
         Table table = client.getTable(outputInfo.getDatabaseName(), outputInfo.getTableName());
 
-        if( outputInfo.getPartitionValues() == null ) {
+        if (table.getPartitionKeysSize() == 0 ){
+          if ((outputInfo.getPartitionValues() != null) && (!outputInfo.getPartitionValues().isEmpty())){
+            // attempt made to save partition values in non-partitioned table - throw error.
+            throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES, 
+                "Partition values specified for non-partitioned table");
+          }
+          // non-partitioned table
           outputInfo.setPartitionValues(new HashMap<String, String>());
+          
         } else {
-          //Convert user specified map to have lower case key names
+          // partitioned table, we expect partition values
+          // convert user specified map to have lower case key names
           Map<String, String> valueMap = new HashMap<String, String>();
-          for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
-            valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+          if (outputInfo.getPartitionValues() != null){
+            for(Map.Entry<String, String> entry : outputInfo.getPartitionValues().entrySet()) {
+              valueMap.put(entry.getKey().toLowerCase(), entry.getValue());
+            }
+          }
+
+          if (
+              (outputInfo.getPartitionValues() == null)
+              || (outputInfo.getPartitionValues().size() < table.getPartitionKeysSize())
+          ){
+            // dynamic partition usecase - partition values were null, or not all were specified
+            // need to figure out which keys are not specified.
+            List<String> dynamicPartitioningKeys = new ArrayList<String>();
+            boolean firstItem = true;
+            for (FieldSchema fs : table.getPartitionKeys()){
+              if (!valueMap.containsKey(fs.getName().toLowerCase())){
+                dynamicPartitioningKeys.add(fs.getName().toLowerCase());
+              }
+            }
+            
+            if (valueMap.size() + dynamicPartitioningKeys.size() != table.getPartitionKeysSize()){
+              // If this isn't equal, then bogus key values have been inserted, error out.
+              throw new HCatException(ErrorType.ERROR_INVALID_PARTITION_VALUES,"Invalid partition keys specified");
+            }
+                        
+            outputInfo.setDynamicPartitioningKeys(dynamicPartitioningKeys);
+            String dynHash;
+            if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null){
+              dynHash = String.valueOf(Math.random());
+//              LOG.info("New dynHash : ["+dynHash+"]");
+//            }else{
+//              LOG.info("Old dynHash : ["+dynHash+"]");
+            }
+            conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
+
           }
 
           outputInfo.setPartitionValues(valueMap);
@@ -125,11 +179,13 @@ public class HCatOutputFormat extends HC
         String tblLocation = tblSD.getLocation();
         String location = driver.getOutputLocation(job,
             tblLocation, partitionCols,
-            outputInfo.getPartitionValues());
+            outputInfo.getPartitionValues(),conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
 
         //Serialize the output info into the configuration
         OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
                 tableSchema, tableSchema, storerInfo, location, table);
+        jobInfo.setHarRequested(harRequested);
+        jobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
         conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
 
         Path tblPath = new Path(tblLocation);
@@ -176,6 +232,7 @@ public class HCatOutputFormat extends HC
             // TableInfo, we can have as many tokens as there are stores and the TokenSelector
             // will correctly pick the right tokens which the committer will use and
             // cancel.
+            
             String tokenSignature = getTokenSignature(outputInfo);
             if(tokenMap.get(tokenSignature) == null) {
               // get delegation tokens from hcat server and store them into the "job"
@@ -183,19 +240,32 @@ public class HCatOutputFormat extends HC
               // hcat
               // when the JobTracker in Hadoop MapReduce starts supporting renewal of 
               // arbitrary tokens, the renewer should be the principal of the JobTracker
-              String tokenStrForm = client.getDelegationToken(ugi.getUserName());
-              Token<DelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
-              t.decodeFromUrlString(tokenStrForm);
-              t.setService(new Text(tokenSignature));
-              tokenMap.put(tokenSignature, t);
+              tokenMap.put(tokenSignature, HCatUtil.extractThriftToken(
+                  client.getDelegationToken(ugi.getUserName()),
+                  tokenSignature));
+            }
+
+            String jcTokenSignature = "jc."+tokenSignature;
+            if(tokenMap.get(jcTokenSignature) == null) {
+              tokenMap.put(jcTokenSignature,
+                  HCatUtil.getJobTrackerDelegationToken(conf,ugi.getUserName()));
             }
+            
             job.getCredentials().addToken(new Text(ugi.getUserName() + tokenSignature),
                 tokenMap.get(tokenSignature));
             // this will be used by the outputcommitter to pass on to the metastore client
             // which in turn will pass on to the TokenSelector so that it can select
             // the right token.
+            job.getCredentials().addToken(new Text(ugi.getUserName() + jcTokenSignature),
+                tokenMap.get(jcTokenSignature));
+            
             job.getConfiguration().set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, tokenSignature);
-        }
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE, jcTokenSignature);
+            job.getConfiguration().set(HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM, tokenMap.get(jcTokenSignature).encodeToUrlString());
+
+//            LOG.info("Set hive dt["+tokenSignature+"]");
+//            LOG.info("Set jt dt["+jcTokenSignature+"]");
+          }
        }
       } catch(Exception e) {
         if( e instanceof HCatException ) {
@@ -207,10 +277,10 @@ public class HCatOutputFormat extends HC
         if( client != null ) {
           client.close();
         }
+//        HCatUtil.logAllTokens(LOG,job);
       }
     }
 
-
     // a signature string to associate with a HCatTableInfo - essentially
     // a concatenation of dbname, tablename and partition keyvalues.
     private static String getTokenSignature(HCatTableInfo outputInfo) {
@@ -232,11 +302,10 @@ public class HCatOutputFormat extends HC
       return result.toString();
     }
 
-
-
     /**
      * Handles duplicate publish of partition. Fails if partition already exists.
      * For non partitioned tables, fails if files are present in table directory.
+     * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time
      * @param job the job
      * @param outputInfo the output info
      * @param client the metastore client
@@ -247,18 +316,33 @@ public class HCatOutputFormat extends HC
      */
     private static void handleDuplicatePublish(Job job, HCatTableInfo outputInfo,
         HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException {
-      List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
-                  table, outputInfo.getPartitionValues());
+
+      /*
+       * For fully specified ptn, follow strict checks for existence of partitions in metadata
+       * For unpartitioned tables, follow filechecks
+       * For partially specified tables:
+       *    This would then need filechecks at the start of a ptn write,
+       *    Doing metadata checks can get potentially very expensive (fat conf) if 
+       *    there are a large number of partitions that match the partial specifications
+       */
 
       if( table.getPartitionKeys().size() > 0 ) {
-        //For partitioned table, fail if partition is already present
-        List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
-            outputInfo.getTableName(), partitionValues, (short) 1);
+        if (!outputInfo.isDynamicPartitioningUsed()){
+          List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+              table, outputInfo.getPartitionValues());
+          // fully-specified partition
+          List<String> currentParts = client.listPartitionNames(outputInfo.getDatabaseName(),
+              outputInfo.getTableName(), partitionValues, (short) 1);
 
-        if( currentParts.size() > 0 ) {
-          throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+          if( currentParts.size() > 0 ) {
+            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
+          }
         }
       } else {
+        List<String> partitionValues = HCatOutputCommitter.getPartitionValueList(
+            table, outputInfo.getPartitionValues());
+        // non-partitioned table
+        
         Path tablePath = new Path(table.getSd().getLocation());
         FileSystem fs = tablePath.getFileSystem(job.getConfiguration());
 
@@ -299,24 +383,12 @@ public class HCatOutputFormat extends HC
       getRecordWriter(TaskAttemptContext context
                       ) throws IOException, InterruptedException {
 
-      // First create the RW.
       HCatRecordWriter rw = new HCatRecordWriter(context);
-
-      // Now set permissions and group on freshly created files.
-      OutputJobInfo info =  getJobInfo(context);
-      Path workFile = rw.getStorageDriver().getWorkFilePath(context,info.getLocation());
-      Path tblPath = new Path(info.getTable().getSd().getLocation());
-      FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
-      FileStatus tblPathStat = fs.getFileStatus(tblPath);
-      fs.setPermission(workFile, tblPathStat.getPermission());
-      try{
-        fs.setOwner(workFile, null, tblPathStat.getGroup());
-      } catch(AccessControlException ace){
-        // log the messages before ignoring. Currently, logging is not built in HCat.
-      }
+      rw.prepareForStorageDriverOutput(context);
       return rw;
     }
 
+
     /**
      * Get the output committer for this output format. This is responsible
      * for ensuring the output is committed correctly.
@@ -329,10 +401,17 @@ public class HCatOutputFormat extends HC
     public OutputCommitter getOutputCommitter(TaskAttemptContext context
                                        ) throws IOException, InterruptedException {
         OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
-        return new HCatOutputCommitter(outputFormat.getOutputCommitter(context));
+        return new HCatOutputCommitter(context,outputFormat.getOutputCommitter(context));
     }
 
     static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
+      HiveConf hiveConf = getHiveConf(url, conf);
+//      HCatUtil.logHiveConf(LOG, hiveConf);
+      return new HiveMetaStoreClient(hiveConf);
+    }
+
+
+    private static HiveConf getHiveConf(String url, Configuration conf) throws IOException {
       HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
 
       if( url != null ) {
@@ -372,9 +451,48 @@ public class HCatOutputFormat extends HC
         }
 
       }
+      
+      // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo
+      if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
+        maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
+      }else{
+        maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions 
+      }
+      harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
+      return hiveConf;
+    }
 
-      return new HiveMetaStoreClient(hiveConf);
+    /**
+     * Any initialization of file paths, set permissions and group on freshly created files
+     * This is called at RecordWriter instantiation time which can be at write-time for  
+     * a dynamic partitioning usecase
+     * @param context
+     * @throws IOException
+     */
+    public static void prepareOutputLocation(HCatOutputStorageDriver osd, TaskAttemptContext context) throws IOException {
+      OutputJobInfo info =  HCatBaseOutputFormat.getJobInfo(context);
+//      Path workFile = osd.getWorkFilePath(context,info.getLocation());
+      Path workFile = osd.getWorkFilePath(context,context.getConfiguration().get("mapred.output.dir"));
+      Path tblPath = new Path(info.getTable().getSd().getLocation());
+      FileSystem fs = tblPath.getFileSystem(context.getConfiguration());
+      FileStatus tblPathStat = fs.getFileStatus(tblPath);
+      
+//      LOG.info("Attempting to set permission ["+tblPathStat.getPermission()+"] on ["+
+//          workFile+"], location=["+info.getLocation()+"] , mapred.locn =["+
+//          context.getConfiguration().get("mapred.output.dir")+"]"); 
+//
+//      FileStatus wFileStatus = fs.getFileStatus(workFile);
+//      LOG.info("Table : "+tblPathStat.getPath());
+//      LOG.info("Working File : "+wFileStatus.getPath());
+      
+      fs.setPermission(workFile, tblPathStat.getPermission());
+      try{
+        fs.setOwner(workFile, null, tblPathStat.getGroup());
+      } catch(AccessControlException ace){
+        // log the messages before ignoring. Currently, logging is not built in HCat.
+      }
     }
 
 
+
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java Fri Jul 22 23:38:07 2011
@@ -22,15 +22,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
@@ -40,6 +48,7 @@ import org.apache.hcatalog.data.schema.H
  */
 public abstract class HCatOutputStorageDriver {
 
+
   /**
    * Initialize the storage driver with specified properties, default implementation does nothing.
    * @param context the job context object
@@ -103,13 +112,22 @@ public abstract class HCatOutputStorageD
      * @param jobContext the job context object
      * @param tableLocation the location of the table
      * @param partitionValues the partition values
+     * @param dynHash A unique hash value that represents the dynamic partitioning job used
      * @return the location String.
      * @throws IOException Signals that an I/O exception has occurred.
      */
     public String getOutputLocation(JobContext jobContext,
-            String tableLocation, List<String> partitionCols, Map<String, String> partitionValues) throws IOException {
+            String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+      
+      String parentPath = tableLocation;
+      // For dynamic partitioned writes without all keyvalues specified, 
+      // we create a temp dir for the associated write job
+      if (dynHash != null){
+        parentPath = new Path(tableLocation, HCatOutputFormat.DYNTEMP_DIR_NAME+dynHash).toString();
+      }
 
-      if( partitionValues == null || partitionValues.size() == 0 ) {
+      // For non-partitioned tables, we send them to the temp dir
+      if((dynHash == null) && ( partitionValues == null || partitionValues.size() == 0 )) {
         return new Path(tableLocation, HCatOutputFormat.TEMP_DIR_NAME).toString();
       }
 
@@ -120,7 +138,7 @@ public abstract class HCatOutputStorageD
 
       String partitionLocation = FileUtils.makePartName(partitionCols, values);
 
-      Path path = new Path(tableLocation, partitionLocation);
+      Path path = new Path(parentPath, partitionLocation);
       return path.toString();
     }
 
@@ -130,4 +148,59 @@ public abstract class HCatOutputStorageD
     public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException{
       return new Path(new FileOutputCommitter(new Path(outputLoc), context).getWorkPath(), FileOutputFormat.getUniqueFile(context, "part",""));
     }
+
+    /**
+     * Implementation that calls the underlying output committer's setupJob, 
+     * used in lieu of underlying committer's setupJob when using dynamic partitioning
+     * The default implementation should be overriden by underlying implementations
+     * that do not use FileOutputCommitter.
+     * The reason this function exists is so as to allow a storage driver implementor to
+     * override underlying OutputCommitter's setupJob implementation to allow for
+     * being called multiple times in a job, to make it idempotent.
+     * This should be written in a manner that is callable multiple times 
+     * from individual tasks without stepping on each others' toes
+     * 
+     * @param context
+     * @throws InterruptedException 
+     * @throws IOException 
+     */
+    public void setupOutputCommitterJob(TaskAttemptContext context) 
+        throws IOException, InterruptedException{
+      getOutputFormat().getOutputCommitter(context).setupJob(context);
+    }
+
+    /**
+     * Implementation that calls the underlying output committer's cleanupJob, 
+     * used in lieu of underlying committer's cleanupJob when using dynamic partitioning
+     * This should be written in a manner that is okay to call after having had
+     * multiple underlying outputcommitters write to task dirs inside it.
+     * While the base MR cleanupJob should have sufficed normally, this is provided
+     * in order to let people implementing setupOutputCommitterJob to cleanup properly
+     * 
+     * @param context
+     * @throws IOException 
+     */
+    public void cleanupOutputCommitterJob(TaskAttemptContext context) 
+        throws IOException, InterruptedException{
+      getOutputFormat().getOutputCommitter(context).cleanupJob(context);
+    }
+
+    /**
+     * Implementation that calls the underlying output committer's abortJob, 
+     * used in lieu of underlying committer's abortJob when using dynamic partitioning
+     * This should be written in a manner that is okay to call after having had
+     * multiple underlying outputcommitters write to task dirs inside it.
+     * While the base MR cleanupJob should have sufficed normally, this is provided
+     * in order to let people implementing setupOutputCommitterJob to abort properly
+     * 
+     * @param context
+     * @param state
+     * @throws IOException 
+     */
+    public void abortOutputCommitterJob(TaskAttemptContext context, State state) 
+        throws IOException, InterruptedException{
+      getOutputFormat().getOutputCommitter(context).abortJob(context,state);
+    }
+
+    
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java?rev=1149763&r1=1149762&r2=1149763&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordWriter.java Fri Jul 22 23:38:07 2011
@@ -18,60 +18,174 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
 public class HCatRecordWriter extends RecordWriter<WritableComparable<?>, HCatRecord> {
 
     private final HCatOutputStorageDriver storageDriver;
-    /**
-     * @return the storageDriver
-     */
-    public HCatOutputStorageDriver getStorageDriver() {
-      return storageDriver;
-    }
+
+    private boolean dynamicPartitioningUsed = false;
+    
+//    static final private Log LOG = LogFactory.getLog(HCatRecordWriter.class);
 
     private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter;
+    private final Map<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+    private final Map<Integer,HCatOutputStorageDriver> baseDynamicStorageDrivers;
+
     private final List<Integer> partColsToDel;
+    private final List<Integer> dynamicPartCols;
+    private int maxDynamicPartitions;
+
+    private OutputJobInfo jobInfo;
+    private TaskAttemptContext context;
 
     public HCatRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
 
-      OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
+      jobInfo = HCatOutputFormat.getJobInfo(context);
+      this.context = context;
 
       // If partition columns occur in data, we want to remove them.
       partColsToDel = jobInfo.getPosOfPartCols();
+      dynamicPartitioningUsed = jobInfo.getTableInfo().isDynamicPartitioningUsed();
+      dynamicPartCols = jobInfo.getPosOfDynPartCols();
+      maxDynamicPartitions = jobInfo.getMaxDynamicPartitions();
 
-      if(partColsToDel == null){
+      if((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))){
         throw new HCatException("It seems that setSchema() is not called on " +
         		"HCatOutputFormat. Please make sure that method is called.");
       }
+      
+
+      if (!dynamicPartitioningUsed){
+        this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
+        this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+        this.baseDynamicStorageDrivers = null;
+        this.baseDynamicWriters = null;
+      }else{
+        this.baseDynamicStorageDrivers = new HashMap<Integer,HCatOutputStorageDriver>();
+        this.baseDynamicWriters = new HashMap<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+        this.storageDriver = null;
+        this.baseWriter = null;
+      }
 
-      this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
-      this.baseWriter = storageDriver.getOutputFormat().getRecordWriter(context);
+    }
+
+    /**
+     * @return the storageDriver
+     */
+    public HCatOutputStorageDriver getStorageDriver() {
+      return storageDriver;
     }
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
             InterruptedException {
+      if (dynamicPartitioningUsed){
+        for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+          bwriter.close(context);
+        }
+        for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){
+          OutputCommitter baseOutputCommitter = osd.getOutputFormat().getOutputCommitter(context);
+          if (baseOutputCommitter.needsTaskCommit(context)){
+            baseOutputCommitter.commitTask(context);
+          }
+        }
+      } else {
         baseWriter.close(context);
+      }
     }
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
             InterruptedException {
+      RecordWriter<? super WritableComparable<?>, ? super Writable> localWriter;
+      HCatOutputStorageDriver localDriver;
+      
+//      HCatUtil.logList(LOG, "HCatRecord to write", value.getAll());
+
+      if (dynamicPartitioningUsed){
+        // calculate which writer to use from the remaining values - this needs to be done before we delete cols
+
+        List<String> dynamicPartValues = new ArrayList<String>();
+        for (Integer colToAppend :  dynamicPartCols){
+          dynamicPartValues.add(value.get(colToAppend).toString());
+        }
+        
+        int dynHashCode = dynamicPartValues.hashCode();
+        if (!baseDynamicWriters.containsKey(dynHashCode)){
+//          LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size()
+//              +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString());
+          if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){
+            throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, 
+                "Number of dynamic partitions being created "
+                + "exceeds configured max allowable partitions["
+                + maxDynamicPartitions 
+                + "], increase parameter [" 
+                + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+                + "] if needed.");
+          }
+//          HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues);
+//          HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols);
+          
+          HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues);
+          RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter 
+            = localOsd.getOutputFormat().getRecordWriter(context);
+          localOsd.setupOutputCommitterJob(context);
+          OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context);
+          baseOutputCommitter.setupTask(context);
+          prepareForStorageDriverOutput(localOsd,context);
+          baseDynamicWriters.put(dynHashCode, baseRecordWriter);
+          baseDynamicStorageDrivers.put(dynHashCode,localOsd);
+        }
+
+        localWriter = baseDynamicWriters.get(dynHashCode);
+        localDriver = baseDynamicStorageDrivers.get(dynHashCode);
+      }else{
+        localWriter = baseWriter;
+        localDriver = storageDriver;
+      }
 
       for(Integer colToDel : partColsToDel){
         value.remove(colToDel);
       }
-        //The key given by user is ignored
-        WritableComparable<?> generatedKey = storageDriver.generateKey(value);
-        Writable convertedValue = storageDriver.convertValue(value);
-        baseWriter.write(generatedKey, convertedValue);
+
+      //The key given by user is ignored
+      WritableComparable<?> generatedKey = localDriver.generateKey(value);
+      Writable convertedValue = localDriver.convertValue(value);
+      localWriter.write(generatedKey, convertedValue);
+    }
+
+    protected HCatOutputStorageDriver createDynamicStorageDriver(List<String> dynamicPartVals) throws IOException {
+      HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals);
+      return localOsd;
+    }
+
+    public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException {
+      // Set permissions and group on freshly created files.
+      if (!dynamicPartitioningUsed){
+        HCatOutputStorageDriver localOsd = this.getStorageDriver();
+        prepareForStorageDriverOutput(localOsd,context);
+      }
+    }
+
+    private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd,
+        TaskAttemptContext context) throws IOException {
+      HCatOutputFormat.prepareOutputLocation(localOsd,context);
     }
 }