You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/15 01:36:09 UTC

svn commit: r934241 [2/16] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ metastore/src/java/org/apache/hadoop/hive/metastore/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql...

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Apr 14 23:36:07 2010
@@ -47,7 +47,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -77,12 +76,13 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ErrorMsg;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.io.SequenceFile;
@@ -213,6 +213,7 @@ public final class Utilities {
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+    @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
       java.util.Collection oldO = (java.util.Collection)oldInstance;
       java.util.Collection newO = (java.util.Collection)newInstance;
@@ -237,6 +238,7 @@ public final class Utilities {
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+    @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
       java.util.Collection oldO = (java.util.Collection)oldInstance;
       java.util.Collection newO = (java.util.Collection)newInstance;
@@ -262,6 +264,7 @@ public final class Utilities {
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       return false;
     }
+    @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
       java.util.Collection oldO = (java.util.Collection)oldInstance;
       java.util.Collection newO = (java.util.Collection)newInstance;
@@ -339,6 +342,7 @@ public final class Utilities {
   }
 
   public static   class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
+    @Override
     protected Expression instantiate(Object oldInstance, Encoder out) {
       return new Expression(oldInstance,
                             oldInstance.getClass(),
@@ -346,6 +350,7 @@ public final class Utilities {
                             null);
     }
 
+    @Override
     protected void initialize(Class type, Object oldInstance, Object newInstance,
                               Encoder out) {
       Iterator ite = ((Collection) oldInstance).iterator();
@@ -712,8 +717,7 @@ public final class Utilities {
     if (isCompressed) {
       Class<? extends CompressionCodec> codecClass = FileOutputFormat
           .getOutputCompressorClass(jc, DefaultCodec.class);
-      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
-          codecClass, jc);
+      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
       return codec.createOutputStream(out);
     } else {
       return (out);
@@ -736,8 +740,7 @@ public final class Utilities {
     } else {
       Class<? extends CompressionCodec> codecClass = FileOutputFormat
           .getOutputCompressorClass(jc, DefaultCodec.class);
-      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
-          codecClass, jc);
+      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
       return codec.getDefaultExtension();
     }
   }
@@ -967,20 +970,30 @@ public final class Utilities {
   private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
 
   /**
+   * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID.
+   */
+  private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$");
+
+  /**
    * Get the task id from the filename. E.g., get "000000" out of
    * "24931_r_000000_0" or "24931_r_000000_0.gz"
    */
   public static String getTaskIdFromFilename(String filename) {
+    String taskId = filename;
     Matcher m = fileNameTaskIdRegex.matcher(filename);
     if (!m.matches()) {
-      LOG.warn("Unable to get task id from file name: " + filename
-          + ". Using full filename as task id.");
-      return filename;
+      Matcher m2 = fileNameLocalTaskIdRegex.matcher(filename);
+      if (!m2.matches()) {
+        LOG.warn("Unable to get task id from file name: " + filename
+            + ". Using full filename as task id.");
+      } else {
+        taskId = m2.group(1);
+      }
     } else {
-      String taskId = m.group(1);
-      LOG.debug("TaskId for " + filename + " = " + taskId);
-      return taskId;
+      taskId = m.group(1);
     }
+    LOG.debug("TaskId for " + filename + " = " + taskId);
+    return taskId;
   }
 
   /**
@@ -990,6 +1003,11 @@ public final class Utilities {
    */
   public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
     String taskId = getTaskIdFromFilename(filename);
+    String newTaskId = replaceTaskId(taskId, bucketNum);
+    return replaceTaskIdFromFilename(filename, taskId, newTaskId);
+  }
+
+  public static String replaceTaskId(String taskId, int bucketNum) {
     String strBucketNum = String.valueOf(bucketNum);
     int bucketNumLen = strBucketNum.length();
     int taskIdLen = taskId.length();
@@ -997,17 +1015,30 @@ public final class Utilities {
     for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
         s.append("0");
     }
-    String newTaskId = s.toString() + strBucketNum;
-    String[] spl = filename.split(taskId);
+    return s.toString() + strBucketNum;
+  }
+
+  /**
+   * Replace the oldTaskId appearing in the filename by the newTaskId.
+   * The string oldTaskId could appear multiple times, we should only replace the last one.
+   * @param filename
+   * @param oldTaskId
+   * @param newTaskId
+   * @return
+   */
+  public static String replaceTaskIdFromFilename(String filename,
+      String oldTaskId, String newTaskId) {
+
+    String[] spl = filename.split(oldTaskId);
 
     if ((spl.length == 0) || (spl.length == 1)) {
-      return filename.replaceAll(taskId, newTaskId);
+      return filename.replaceAll(oldTaskId, newTaskId);
     }
 
     StringBuffer snew = new StringBuffer();
     for (int idx = 0; idx < spl.length-1; idx++) {
       if (idx > 0) {
-        snew.append(taskId);
+        snew.append(oldTaskId);
       }
       snew.append(spl[idx]);
     }
@@ -1017,21 +1048,88 @@ public final class Utilities {
   }
 
   /**
+   * Get all file status from a root path and recursively go deep into certain levels.
+   * @param path the root path
+   * @param level the depth of directory should explore
+   * @param fs the file system
+   * @return array of FileStatus
+   * @throws IOException
+   */
+  public static FileStatus[] getFileStatusRecurse(Path path, int level,
+      FileSystem fs) throws IOException {
+
+    // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
+    StringBuilder sb = new StringBuilder(path.toUri().getPath());
+    for (int i = 0; i < level; ++i) {
+      sb.append(Path.SEPARATOR).append("*");
+    }
+    Path pathPattern = new Path(path, sb.toString());
+    return fs.globStatus(pathPattern);
+  }
+
+  /**
+   * Remove all temporary files and duplicate (double-committed) files from a
+   * given directory.
+   * @return a list of path names corresponding to should-be-created empty buckets.
+   */
+  public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
+    removeTempOrDuplicateFiles(fs, path, null);
+  }
+
+  /**
    * Remove all temporary files and duplicate (double-committed) files from a
    * given directory.
+   * @return a list of path names corresponding to should-be-created empty buckets.
    */
-  public static void removeTempOrDuplicateFiles(FileSystem fs, Path path)
+  public static ArrayList<String> removeTempOrDuplicateFiles(FileSystem fs, Path path, DynamicPartitionCtx dpCtx)
       throws IOException {
     if (path == null) {
-      return;
+      return null;
     }
 
-    FileStatus[] items = fs.listStatus(path);
-    if (items == null) {
-      return;
+    ArrayList<String> result = new ArrayList<String>();
+    if (dpCtx != null) {
+      FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+      HashMap<String, FileStatus> taskIDToFile = null;
+
+      for (int i = 0; i < parts.length; ++i) {
+        assert parts[i].isDir(): "dynamic partition " + parts[i].getPath() + " is not a direcgtory";
+        FileStatus[] items = fs.listStatus(parts[i].getPath());
+        taskIDToFile = removeTempOrDuplicateFiles(items, fs);
+        // if the table is bucketed and enforce bucketing, we should check and generate all buckets
+        if (dpCtx.getNumBuckets() > 0 && taskIDToFile != null) {
+          // refresh the file list
+          items = fs.listStatus(parts[i].getPath());
+          // get the missing buckets and generate empty buckets
+          String taskID1 = taskIDToFile.keySet().iterator().next();
+          Path bucketPath = taskIDToFile.values().iterator().next().getPath();
+          for (int j = 0; j < dpCtx.getNumBuckets(); ++j ) {
+            String taskID2 = replaceTaskId(taskID1, j);
+            if (!taskIDToFile.containsKey(taskID2)) {
+              // create empty bucket, file name should be derived from taskID2
+              String path2 = replaceTaskIdFromFilename(bucketPath.toUri().getPath().toString(), j);
+              result.add(path2);
+            }
+          }
+        }
+      }
+    } else {
+      FileStatus[] items = fs.listStatus(path);
+      removeTempOrDuplicateFiles(items, fs);
+    }
+    return result;
+ }
+
+  public static HashMap<String, FileStatus> removeTempOrDuplicateFiles(
+      FileStatus[] items, FileSystem fs)
+      throws IOException {
+
+    if (items == null || fs == null) {
+      return null;
     }
 
     HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
+
     for (FileStatus one : items) {
       if (isTempPath(one)) {
         if (!fs.delete(one.getPath(), true)) {
@@ -1053,6 +1151,7 @@ public final class Utilities {
         }
       }
     }
+    return taskIdToFile;
   }
 
   public static String getNameMessage(Exception e) {
@@ -1230,4 +1329,5 @@ public final class Utilities {
       job.set(entry.getKey(), entry.getValue());
     }
   }
+
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Wed Apr 14 23:36:07 2010
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -50,27 +50,30 @@ import org.apache.hadoop.util.Reflection
 
 /**
  * Simple persistent container for rows.
- * 
+ *
  * This container interface only accepts adding or appending new rows and
  * iterating through the rows in the order of their insertions.
- * 
+ *
  * The iterator interface is a lightweight first()/next() API rather than the
  * Java Iterator interface. This way we do not need to create an Iterator object
  * every time we want to start a new iteration. Below is simple example of how
- * to convert a typical Java's Iterator code to the LW iterator iterface.
- * 
- * Itereator itr = rowContainer.iterator(); while (itr.hasNext()) { v =
- * itr.next(); // do anything with v }
- * 
+ * to convert a typical Java's Iterator code to the LW iterator interface.
+ *
+ * Iterator itr = rowContainer.iterator();
+ * while (itr.hasNext()) {
+ *   v = itr.next(); // do anything with v
+ * }
+ *
  * can be rewritten to:
- * 
- * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) { // do
- * anything with v }
- * 
+ *
+ * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
+ *   // do anything with v
+ * }
+ *
  * Once the first is called, it will not be able to write again. So there can
  * not be any writes after read. It can be read multiple times, but it does not
  * support multiple reader interleaving reading.
- * 
+ *
  */
 public class RowContainer<Row extends List<Object>> {
 
@@ -82,7 +85,7 @@ public class RowContainer<Row extends Li
   private Row[] currentWriteBlock; // the last block that add() should append to
   private Row[] currentReadBlock; // the current block where the cursor is in
   // since currentReadBlock may assigned to currentWriteBlock, we need to store
-  // orginal read block
+  // original read block
   private Row[] firstReadBlockPointer;
   private int blockSize; // number of objects in the block before it is spilled
   // to disk
@@ -201,7 +204,7 @@ public class RowContainer<Row extends Li
       } else {
         if (inputSplits == null) {
           if (this.inputFormat == null) {
-            inputFormat = (InputFormat<WritableComparable, Writable>)ReflectionUtils
+            inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils
                 .newInstance(tblDesc.getInputFileFormatClass(),
                 jobCloneUsingLocalFs);
           }
@@ -355,7 +358,7 @@ public class RowContainer<Row extends Li
 
   /**
    * Get the number of elements in the RowContainer.
-   * 
+   *
    * @return number of elements in the RowContainer
    */
   public int size() {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Wed Apr 14 23:36:07 2010
@@ -60,10 +60,10 @@ public class WriteEntity implements Seri
 
   /**
    * This is derived from t and p, but we need to serialize this field to make sure
-   * WriteEntity.hashCode() does not need to recursively read into t and p. 
+   * WriteEntity.hashCode() does not need to recursively read into t and p.
    */
   private String name;
-  
+
   public String getName() {
     return name;
   }
@@ -109,10 +109,10 @@ public class WriteEntity implements Seri
    */
   public WriteEntity() {
   }
-  
+
   /**
    * Constructor for a table.
-   * 
+   *
    * @param t
    *          Table that is written to.
    */
@@ -126,7 +126,7 @@ public class WriteEntity implements Seri
 
   /**
    * Constructor for a partition.
-   * 
+   *
    * @param p
    *          Partition that is written to.
    */
@@ -140,7 +140,7 @@ public class WriteEntity implements Seri
 
   /**
    * Constructor for a file.
-   * 
+   *
    * @param d
    *          The name of the directory that is being written to.
    * @param islocal
@@ -205,7 +205,7 @@ public class WriteEntity implements Seri
   public String toString() {
     return name;
   }
-  
+
   private String computeName() {
     switch (typ) {
     case TABLE:

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Wed Apr 14 23:36:07 2010
@@ -29,13 +29,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Apr 14 23:36:07 2010
@@ -19,8 +19,8 @@
 package org.apache.hadoop.hive.ql.metadata;
 
 import java.io.IOException;
-import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -537,7 +537,7 @@ public class Hive {
    *          The temporary directory.
    */
   public void loadPartition(Path loadPath, String tableName,
-      AbstractMap<String, String> partSpec, boolean replace, Path tmpDirPath)
+      Map<String, String> partSpec, boolean replace, Path tmpDirPath)
       throws HiveException {
     Table tbl = getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
     try {
@@ -558,8 +558,8 @@ public class Hive {
         // extrapolated from
         // the table's location (even if the table is marked external)
         fs = FileSystem.get(tbl.getDataLocation(), getConf());
-        partPath = new Path(tbl.getDataLocation().getPath(), Warehouse
-            .makePartName(partSpec));
+        partPath = new Path(tbl.getDataLocation().getPath(),
+            Warehouse.makePartName(partSpec));
       } else {
         // Partition exists already. Get the path from the partition. This will
         // get the default path for Hive created partitions or the external path
@@ -588,6 +588,62 @@ public class Hive {
   }
 
   /**
+   * Given a source directory name of the load path, load all dynamically generated partitions
+   * into the specified table and return a list of strings that represent the dynamic partition
+   * paths.
+   * @param loadPath
+   * @param tableName
+   * @param partSpec
+   * @param replace
+   * @param tmpDirPath
+   * @param numSp: number of static partitions in the partition spec
+   * @return
+   * @throws HiveException
+   */
+  public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+      String tableName, Map<String, String> partSpec, boolean replace,
+      Path tmpDirPath, int numDP)
+      throws HiveException {
+
+    try {
+      ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
+        new ArrayList<LinkedHashMap<String, String>>();
+
+      FileSystem fs = loadPath.getFileSystem(conf);
+      FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDP, fs);
+
+      if (status.length > conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)) {
+        throw new HiveException("Number of dynamic partitions created is " + status.length
+            + ", which is more than "
+            + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS)
+            +". To solve this try to set " + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
+            + " to at least " + status.length + '.');
+      }
+
+      // for each dynamically created DP directory, construct a full partition spec
+      // and load the partition based on that
+      for (int i= 0; i < status.length; ++i) {
+        // get the dynamically created directory
+        Path partPath = status[i].getPath();
+        assert fs.getFileStatus(partPath).isDir():
+          "partitions " + partPath + " is not a directory !";
+
+        // generate a full partition specification
+        LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
+        Warehouse.makeSpecFromName(fullPartSpec, partPath);
+      	fullPartSpecs.add(fullPartSpec);
+
+        // finally load the partition -- move the file to the final table address
+      	loadPartition(partPath, tableName, fullPartSpec, replace, tmpDirPath);
+      	LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
+    	}
+      return fullPartSpecs;
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+  }
+
+  /**
    * Load a directory into a Hive Table. - Alters existing content of table with
    * the contents of loadPath. - If table does not exist - an exception is
    * thrown - files in loadPath are moved into Hive. But the directory itself is
@@ -687,11 +743,14 @@ public class Hive {
     List<String> pvals = new ArrayList<String>();
     for (FieldSchema field : tbl.getPartCols()) {
       String val = partSpec.get(field.getName());
-      if (val == null || val.length() == 0) {
+      // enable dynamic partitioning
+      if (val == null && !HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)
+          || val.length() == 0) {
         throw new HiveException("get partition: Value for key "
             + field.getName() + " is null or empty");
+      } else if (val != null){
+        pvals.add(val);
       }
-      pvals.add(val);
     }
     org.apache.hadoop.hive.metastore.api.Partition tpart = null;
     try {
@@ -700,7 +759,6 @@ public class Hive {
         LOG.debug("creating partition for table " + tbl.getTableName()
             + " with partition spec : " + partSpec);
         tpart = getMSC().appendPartition(tbl.getDbName(), tbl.getTableName(), pvals);
-        ;
       }
       if (tpart == null) {
         return null;
@@ -845,8 +903,8 @@ public class Hive {
             continue;
           }
           if (item.isDir()) {
-            throw new HiveException("checkPaths: " + src.toString()
-                + " has nested directory" + item.toString());
+            throw new HiveException("checkPaths: " + src.getPath()
+                + " has nested directory" + item.getPath());
           }
           Path tmpDest = new Path(destf, item.getPath().getName());
           if (!replace && fs.exists(tmpDest)) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -21,12 +21,12 @@ package org.apache.hadoop.hive.ql.parse;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,10 +69,10 @@ public abstract class BaseSemanticAnalyz
 
   protected Context ctx;
   protected HashMap<String, String> idToTableNameMap;
-  
+
   public static int HIVE_COLUMN_ORDER_ASC = 1;
   public static int HIVE_COLUMN_ORDER_DESC = 0;
-  
+
   /**
    * ReadEntitites that are passed to the hooks.
    */
@@ -336,7 +336,7 @@ public abstract class BaseSemanticAnalyz
   protected List<FieldSchema> getColumns(ASTNode ast) throws SemanticException {
     return getColumns(ast, true);
   }
-  
+
   /**
    * Get the list of FieldSchema out of the ASTNode.
    */
@@ -437,14 +437,16 @@ public abstract class BaseSemanticAnalyz
   public static class tableSpec {
     public String tableName;
     public Table tableHandle;
-    public HashMap<String, String> partSpec;
+    public Map<String, String> partSpec; // has to use LinkedHashMap to enforce order
     public Partition partHandle;
+    public int numDynParts; // number of dynamic partition columns
 
     public tableSpec(Hive db, HiveConf conf, ASTNode ast)
         throws SemanticException {
 
       assert (ast.getToken().getType() == HiveParser.TOK_TAB);
       int childIndex = 0;
+      numDynParts = 0;
 
       try {
         // get table metadata
@@ -468,28 +470,63 @@ public abstract class BaseSemanticAnalyz
       if (ast.getChildCount() == 2) {
         childIndex = 1;
         ASTNode partspec = (ASTNode) ast.getChild(1);
-        partSpec = new LinkedHashMap<String, String>();
+        // partSpec is a mapping from partition column name to its value.
+        partSpec = new LinkedHashMap<String, String>(partspec.getChildCount());
         for (int i = 0; i < partspec.getChildCount(); ++i) {
           ASTNode partspec_val = (ASTNode) partspec.getChild(i);
-          String val = stripQuotes(partspec_val.getChild(1).getText());
-          partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText()
-              .toLowerCase()), val);
+          String val = null;
+          if (partspec_val.getChildCount() < 2) { // DP in the form of T partition (ds, hr)
+            ++numDynParts;
+          } else { // in the form of T partition (ds="2010-03-03")
+            val = stripQuotes(partspec_val.getChild(1).getText());
+          }
+          partSpec.put(unescapeIdentifier(partspec_val.getChild(0).getText().toLowerCase()), val);
         }
-        try {
-          // In case the partition already exists, we need to get the partition
-          // data from the metastore
-          partHandle = db.getPartition(tableHandle, partSpec, false);
-          if (partHandle == null) {
-            // this doesn't create partition. partition is created in MoveTask
-            partHandle = new Partition(tableHandle, partSpec, null);
+        // check if the partition spec is valid
+        if (numDynParts > 0) {
+          List<FieldSchema> parts = tableHandle.getPartitionKeys();
+          int numStaPart = parts.size() - numDynParts;
+          if (numStaPart == 0 &&
+              conf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+            throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg());
           }
-        } catch (HiveException e) {
-          throw new SemanticException(ErrorMsg.INVALID_PARTITION.getMsg(ast
-              .getChild(childIndex)));
+        	for (FieldSchema fs: parts) {
+        	  if (partSpec.get(fs.getName().toLowerCase()) == null) {
+        	    if (numStaPart > 0) { // found a DP, but there exists ST as subpartition
+        	      throw new SemanticException(
+        	          ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg(ast.getChild(childIndex)));
+        	    }
+        	    break;
+          	} else {
+          	  --numStaPart;
+          	}
+        	}
+          partHandle = null;
+        } else {
+          try {
+            // In case the partition already exists, we need to get the partition
+          	// data from the metastore
+            partHandle = db.getPartition(tableHandle, partSpec, false);
+	          if (partHandle == null) {
+	            // this doesn't create partition. partition is created in MoveTask
+  	          partHandle = new Partition(tableHandle, partSpec, null);
+	          }
+        	} catch (HiveException e) {
+         		throw new SemanticException(
+         		    ErrorMsg.INVALID_PARTITION.getMsg(ast.getChild(childIndex)));
+        	}
         }
       }
     }
 
+    public Map<String, String> getPartSpec() {
+      return this.partSpec;
+    }
+
+    public void setPartSpec(Map<String, String> partSpec) {
+      this.partSpec = partSpec;
+    }
+
     @Override
     public String toString() {
       if (partHandle != null) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java Wed Apr 14 23:36:07 2010
@@ -137,6 +137,15 @@ public enum ErrorMsg {
   VIEW_COL_MISMATCH("The number of columns produced by the SELECT clause does not match the "
       + "number of column names specified by CREATE VIEW"),
   DML_AGAINST_VIEW("A view cannot be used as target table for LOAD or INSERT"),
+  PARTITION_DYN_STA_ORDER("Dynamic partition cannot be the parent of a static partition"),
+  DYNAMIC_PARTITION_DISABLED("Dynamic partition is disabled. Either enable it by setting "
+      + "hive.exec.dynamic.partition=true or specify partition column values"),
+  DYNAMIC_PARTITION_STRICT_MODE("Dynamic partition strict mode requires at least one "
+      + "static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict"),
+  DYNAMIC_PARTITION_MERGE("Dynamic partition does not support merging mapfiles/mapredfiles yet."
+      + "Please set hive.merge.mapfiles and hive.merge.mapredfiles to false or use static "
+      +	"partitions"),
+  NONEXISTPARTCOL("Partition column in the partition specification does not exist"),
   UNSUPPORTED_TYPE("DATE, DATETIME, and TIMESTAMP types aren't supported yet. Please use "
       + "STRING instead."),
   CREATE_NON_NATIVE_AS("CREATE TABLE AS SELECT cannot be used for a non-native table"),
@@ -168,7 +177,7 @@ public enum ErrorMsg {
    * is not found or <code>ErrorMsg</code> has no <code>SQLState</code>, returns
    * the <code>SQLState</code> bound to the <code>GENERIC_ERROR</code>
    * <code>ErrorMsg</code>.
-   * 
+   *
    * @param mesg
    *          An error message string
    * @return SQLState

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g Wed Apr 14 23:36:07 2010
@@ -1391,7 +1391,7 @@ partitionSpec
 
 partitionVal
     :
-    Identifier EQUAL constant -> ^(TOK_PARTVAL Identifier constant)
+    Identifier (EQUAL constant)? -> ^(TOK_PARTVAL Identifier constant?)
     ;
 
 sysFuncNames

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -22,8 +22,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.StringUtils;
@@ -227,10 +228,12 @@ public class LoadSemanticAnalyzer extend
     // create final load/move work
 
     String loadTmpPath = ctx.getExternalTmpFileURI(toURI);
+    Map<String, String> partSpec = ts.getPartSpec();
+    if (partSpec == null) {
+      partSpec = new LinkedHashMap<String, String>();
+    }
     LoadTableDesc loadTableWork = new LoadTableDesc(fromURI.toString(),
-        loadTmpPath, Utilities.getTableDesc(ts.tableHandle),
-        (ts.partSpec != null) ? ts.partSpec : new HashMap<String, String>(),
-        isOverWrite);
+        loadTmpPath, Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
 
     if (rTask != null) {
       rTask.addDependentTask(TaskFactory.get(new MoveWork(getInputs(),

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java Wed Apr 14 23:36:07 2010
@@ -19,15 +19,17 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 
 /**
  * Implementation of the metadata information related to a query block.
- * 
+ *
  **/
 
 public class QBMetaData {
@@ -44,6 +46,8 @@ public class QBMetaData {
   private final HashMap<String, Partition> nameToDestPartition;
   private final HashMap<String, String> nameToDestFile;
   private final HashMap<String, Integer> nameToDestType;
+  private final HashMap<String, Map<String, String>> aliasToPartSpec;
+  private final HashMap<String, DynamicPartitionCtx> aliasToDPCtx;
 
   @SuppressWarnings("unused")
   private static final Log LOG = LogFactory.getLog(QBMetaData.class.getName());
@@ -54,6 +58,8 @@ public class QBMetaData {
     nameToDestPartition = new HashMap<String, Partition>();
     nameToDestFile = new HashMap<String, String>();
     nameToDestType = new HashMap<String, Integer>();
+    aliasToPartSpec = new HashMap<String, Map<String, String>>();
+    aliasToDPCtx  = new HashMap<String, DynamicPartitionCtx>();
   }
 
   // All getXXX needs toLowerCase() because they are directly called from
@@ -109,4 +115,21 @@ public class QBMetaData {
   public Table getSrcForAlias(String alias) {
     return aliasToTable.get(alias.toLowerCase());
   }
+
+  public Map<String, String> getPartSpecForAlias(String alias) {
+    return aliasToPartSpec.get(alias);
+  }
+
+  public void setPartSpecForAlias(String alias, Map<String, String> partSpec) {
+    aliasToPartSpec.put(alias, partSpec);
+
+  }
+
+  public void setDPCtx(String alias, DynamicPartitionCtx dpCtx) {
+    aliasToDPCtx.put(alias, dpCtx);
+  }
+
+  public DynamicPartitionCtx getDPCtx(String alias) {
+    return aliasToDPCtx.get(alias);
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr 14 23:36:07 2010
@@ -31,9 +31,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.Map.Entry;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMROperator;
 import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext;
+import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink1;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink2;
 import org.apache.hadoop.hive.ql.optimizer.GenMRRedSink3;
@@ -97,7 +98,6 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.MapJoinFactory;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
-import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMapRedCtx;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -107,6 +107,7 @@ import org.apache.hadoop.hive.ql.plan.Cr
 import org.apache.hadoop.hive.ql.plan.CreateTableLikeDesc;
 import org.apache.hadoop.hive.ql.plan.CreateViewDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -116,6 +117,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
@@ -136,12 +138,11 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.UDTFDesc;
 import org.apache.hadoop.hive.ql.plan.UnionDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
@@ -149,9 +150,9 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -779,15 +780,23 @@ public class SemanticAnalyzer extends Ba
                 .getMsg(ast, "The class is " + outputFormatClass.toString()));
           }
 
-          if (ts.partSpec == null) {
+          // tableSpec ts is got from the query (user specified),
+          // which means the user didn't specify partitions in their query,
+          // but whether the table itself is partitioned is not know.
+          if (ts.partHandle == null) {
             // This is a table
             qb.getMetaData().setDestForAlias(name, ts.tableHandle);
+            // has dynamic as well as static partitions
+            if (ts.partSpec != null && ts.partSpec.size() > 0) {
+              qb.getMetaData().setPartSpecForAlias(name, ts.partSpec);
+            }
           } else {
             // This is a partition
             qb.getMetaData().setDestForAlias(name, ts.partHandle);
           }
           break;
         }
+
         case HiveParser.TOK_LOCAL_DIR:
         case HiveParser.TOK_DIR: {
           // This is a dfs file
@@ -3152,25 +3161,65 @@ public class SemanticAnalyzer extends Ba
     int currentTableId = 0;
     boolean isLocal = false;
     SortBucketRSCtx rsCtx = new SortBucketRSCtx();
+    DynamicPartitionCtx dpCtx = null;
     LoadTableDesc ltd = null;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
 
       dest_tab = qbm.getDestTableForAlias(dest);
+      Map<String, String> partSpec = qbm.getPartSpecForAlias(dest);
+      dest_path = dest_tab.getPath();
 
       // check for partition
       List<FieldSchema> parts = dest_tab.getPartitionKeys();
-      if (parts != null && parts.size() > 0) {
-        throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+      if (parts != null && parts.size() > 0) { // table is partitioned
+        if (partSpec== null || partSpec.size() == 0) { // user did NOT specify partition
+          throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+        }
+        dpCtx = qbm.getDPCtx(dest);
+        if (dpCtx == null) {
+          validatePartSpec(dest_tab, partSpec);
+          dpCtx = new DynamicPartitionCtx(dest_tab, partSpec,
+              conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+              conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+        	qbm.setDPCtx(dest, dpCtx);
+      	}
+
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING)) { // allow DP
+          // TODO: we should support merge files for dynamically generated partitions later
+          if (dpCtx.getNumDPCols() > 0 &&
+              (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES) ||
+               HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES))) {
+            HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPFILES, false);
+            HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false);
+          }
+          // turn on hive.task.progress to update # of partitions created to the JT
+          HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVEJOBPROGRESS, true);
+
+        } else { // QBMetaData.DEST_PARTITION capture the all-SP case
+          throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg());
+        }
+        if (dpCtx.getSPPath() != null) {
+          dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath());
+      	}
+        if ((dest_tab.getNumBuckets() > 0) &&
+            (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+          dpCtx.setNumBuckets(dest_tab.getNumBuckets());
+        }
       }
-      dest_path = dest_tab.getPath();
+
       boolean isNonNativeTable = dest_tab.isNonNative();
       if (isNonNativeTable) {
         queryTmpdir = dest_path.toUri().getPath();
       } else {
         queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
       }
+      if (dpCtx != null) {
+        // set the root of the temporay path where dynamic partition columns will populate
+        dpCtx.setRootPath(queryTmpdir);
+      }
+      // this table_desc does not contain the partitioning columns
       table_desc = Utilities.getTableDesc(dest_tab);
 
       // Add sorting/bucketing if needed
@@ -3181,13 +3230,18 @@ public class SemanticAnalyzer extends Ba
       destTableId++;
 
       // Create the work for moving the table
+      // NOTE: specify Dynamic partitions in dest_tab for WriteEntity
       if (!isNonNativeTable) {
-        ltd = new LoadTableDesc(queryTmpdir, ctx
-            .getExternalTmpFileURI(dest_path.toUri()), table_desc,
-            new HashMap<String, String>());
+        ltd = new LoadTableDesc(queryTmpdir, ctx.getExternalTmpFileURI(dest_path.toUri()),
+            table_desc, dpCtx);
         loadTableWork.add(ltd);
       }
-      if (!outputs.add(new WriteEntity(dest_tab))) {
+
+      // Here only register the whole table for post-exec hook if no DP present
+      // in the case of DP, we will register WriteEntity in MoveTask when the
+      // list of dynamically created partitions are known.
+      if ((dpCtx == null || dpCtx.getNumDPCols() == 0) &&
+          !outputs.add(new WriteEntity(dest_tab))) {
         throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
             .getMsg(dest_tab.getTableName()));
       }
@@ -3333,7 +3387,7 @@ public class SemanticAnalyzer extends Ba
       throw new SemanticException("Unknown destination type: " + dest_type);
     }
 
-    input = genConversionSelectOperator(dest, qb, input, table_desc);
+    input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx);
     inputRR = opParseCtx.get(input).getRR();
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
@@ -3354,11 +3408,19 @@ public class SemanticAnalyzer extends Ba
 
     RowSchema fsRS = new RowSchema(vecCol);
 
-    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
-        new FileSinkDesc(queryTmpdir, table_desc, conf
-        .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId,
-        rsCtx.isMultiFileSpray(), rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols()),
-        fsRS, input), inputRR);
+    Operator output = putOpInsertMap(
+        OperatorFactory.getAndMakeChild(
+            new FileSinkDesc(
+                queryTmpdir,
+                table_desc,
+                conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT),
+		            currentTableId,
+  		          rsCtx.isMultiFileSpray(),
+    		        rsCtx.getNumFiles(),
+      		      rsCtx.getTotalFiles(),
+                rsCtx.getPartnCols(),
+                dpCtx),
+            fsRS, input), inputRR);
 
 
     if (ltd != null && SessionState.get() != null) {
@@ -3372,12 +3434,26 @@ public class SemanticAnalyzer extends Ba
     return output;
   }
 
+  private void validatePartSpec(Table tbl, Map<String, String> partSpec)
+      throws SemanticException {
+    List<FieldSchema> parts = tbl.getPartitionKeys();
+    Set<String> partCols = new HashSet<String>(parts.size());
+    for (FieldSchema col: parts) {
+      partCols.add(col.getName());
+    }
+    for (String col: partSpec.keySet()) {
+      if (!partCols.contains(col)) {
+        throw new SemanticException(ErrorMsg.NONEXISTPARTCOL.getMsg());
+      }
+    }
+  }
+
   /**
    * Generate the conversion SelectOperator that converts the columns into the
    * types that are expected by the table_desc.
    */
   Operator genConversionSelectOperator(String dest, QB qb, Operator input,
-      TableDesc table_desc) throws SemanticException {
+      TableDesc table_desc, DynamicPartitionCtx dpCtx) throws SemanticException {
     StructObjectInspector oi = null;
     try {
       Deserializer deserializer = table_desc.getDeserializerClass()
@@ -3390,13 +3466,20 @@ public class SemanticAnalyzer extends Ba
 
     // Check column number
     List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
+    boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
     ArrayList<ColumnInfo> rowFields = opParseCtx.get(input).getRR()
         .getColumnInfos();
     if (tableFields.size() != rowFields.size()) {
-      String reason = "Table " + dest + " has " + tableFields.size()
-          + " columns but query has " + rowFields.size() + " columns.";
-      throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
-          qb.getParseInfo().getDestForClause(dest), reason));
+      if (!dynPart || dpCtx == null ||
+          tableFields.size() + dpCtx.getNumDPCols() != rowFields.size()) {
+        String reason = "Table " + dest + " has " + tableFields.size()
+       	   + " columns but query has " + rowFields.size() + " columns.";
+      	throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+       	   qb.getParseInfo().getDestForClause(dest), reason));
+      } else {
+        // create the mapping from input ExprNode to dest table DP column
+        dpCtx.mapInputToDP(rowFields.subList(tableFields.size(), rowFields.size()));
+      }
     }
 
     // Check column types
@@ -3405,13 +3488,14 @@ public class SemanticAnalyzer extends Ba
     ArrayList<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(
         columnNumber);
     // MetadataTypedColumnsetSerDe does not need type conversions because it
-    // does
-    // the conversion to String by itself.
+    // does the conversion to String by itself.
     boolean isMetaDataSerDe = table_desc.getDeserializerClass().equals(
         MetadataTypedColumnsetSerDe.class);
     boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
         LazySimpleSerDe.class);
     if (!isMetaDataSerDe) {
+
+      // here only deals with non-partition columns. We deal with partition columns next
       for (int i = 0; i < columnNumber; i++) {
         ObjectInspector tableFieldOI = tableFields.get(i)
             .getFieldObjectInspector();
@@ -3447,6 +3531,18 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
+    // deal with dynamic partition columns: convert ExprNodeDesc type to String??
+    if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
+      // DP columns starts with tableFields.size()
+      for (int i = tableFields.size(); i < rowFields.size(); ++i ) {
+        TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+        ExprNodeDesc column = new ExprNodeColumnDesc(
+            rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", false);
+        expressions.add(column);
+      }
+      // converted = true; // [TODO]: should we check & convert type to String and set it to true?
+    }
+
     if (converted) {
       // add the select operator
       RowResolver rowResolver = new RowResolver();
@@ -6321,9 +6417,14 @@ public class SemanticAnalyzer extends Ba
         if (cols.size() != 0) {
           throw new SemanticException(ErrorMsg.CTAS_COLLST_COEXISTENCE.getMsg());
         }
-        // TODO: support partition for CTAS?
         if (partCols.size() != 0 || bucketCols.size() != 0) {
-          throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+          boolean dynPart = HiveConf.getBoolVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONING);
+          if (dynPart == false) {
+            throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+          } else {
+            // TODO: support dynamic partition for CTAS
+            throw new SemanticException(ErrorMsg.CTAS_PARCOL_COEXISTENCE.getMsg());
+          }
         }
         if (isExt) {
           throw new SemanticException(ErrorMsg.CTAS_EXTTBL_COEXISTENCE.getMsg());

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java Wed Apr 14 23:36:07 2010
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+public class DynamicPartitionCtx implements Serializable {
+
+  /**
+   * default serialization ID
+   */
+  private static final long serialVersionUID = 1L;
+
+  private Map<String, String> partSpec; // partSpec is an ORDERED hash map
+  private int numDPCols;   // number of dynamic partition columns
+  private int numSPCols;   // number of static partition columns
+  private String spPath;   // path name corresponding to SP columns
+  private String rootPath; // the root path DP columns paths start from
+  private int numBuckets;  // number of buckets in each partition
+
+  private Map<String, String> inputToDPCols; // mapping from input column names to DP columns
+
+  private List<String> spNames; // sp column names
+  private List<String> dpNames; // dp column names
+  private String defaultPartName; // default partition name in case of null or empty value
+  private int maxPartsPerNode;    // maximum dynamic partitions created per mapper/reducer
+
+  public DynamicPartitionCtx() {
+  }
+
+  public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName,
+      int maxParts) {
+    this.partSpec = partSpec;
+    this.spNames = new ArrayList<String>();
+    this.dpNames = new ArrayList<String>();
+    this.numBuckets = 0;
+    this.maxPartsPerNode = maxParts;
+    this.defaultPartName = defaultPartName;
+
+    for (Map.Entry<String, String> me: partSpec.entrySet()) {
+      if (me.getValue() == null) {
+        dpNames.add(me.getKey());
+      } else {
+        spNames.add(me.getKey());
+      }
+    }
+    this.numDPCols = dpNames.size();
+    this.numSPCols = spNames.size();
+    this.inputToDPCols = new HashMap<String, String>();
+    if (this.numSPCols > 0) {
+      this.spPath = Warehouse.makeDynamicPartName(partSpec);
+    } else {
+      this.spPath = null;
+    }
+  }
+
+  public void mapInputToDP(List<ColumnInfo> fs) {
+
+      assert fs.size() == this.numDPCols: "input DP column size != numDPCols";
+
+      Iterator<ColumnInfo> itr1 = fs.iterator();
+      Iterator<String> itr2 = dpNames.iterator();
+
+      while (itr1.hasNext() && itr2.hasNext()) {
+        inputToDPCols.put(itr1.next().getInternalName(), itr2.next());
+      }
+  }
+
+  public int getMaxPartitionsPerNode() {
+    return this.maxPartsPerNode;
+  }
+
+  public void setMaxPartitionsPerNode(int maxParts) {
+    this.maxPartsPerNode = maxParts;
+  }
+
+  public String getDefaultPartitionName() {
+    return this.defaultPartName;
+  }
+
+  public void setDefaultPartitionName(String pname) {
+    this.defaultPartName = pname;
+  }
+
+  public void setNumBuckets(int bk) {
+    this.numBuckets = bk;
+  }
+
+  public int getNumBuckets() {
+    return this.numBuckets;
+  }
+
+  public void setRootPath(String root) {
+    this.rootPath = root;
+  }
+
+  public String getRootPath() {
+    return this.rootPath;
+  }
+
+  public List<String> getDPColNames() {
+    return this.dpNames;
+  }
+
+  public void setDPColNames(List<String> dp) {
+    this.dpNames = dp;
+  }
+
+  public List<String> getSPColNames() {
+    return this.spNames;
+  }
+
+  public void setPartSpec(Map<String, String> ps) {
+    this.partSpec = ps;
+  }
+
+  public Map<String, String> getPartSpec() {
+    return this.partSpec;
+  }
+
+  public void setSPColNames(List<String> sp) {
+    this.spNames = sp;
+  }
+
+  public Map<String, String> getInputToDPCols() {
+    return this.inputToDPCols;
+  }
+
+  public void setInputToDPCols(Map<String, String> map) {
+    this.inputToDPCols = map;
+  }
+
+  public void setNumDPCols(int dp) {
+    this.numDPCols = dp;
+  }
+
+  public int getNumDPCols() {
+    return this.numDPCols;
+  }
+
+  public void setNumSPCols(int sp) {
+    this.numSPCols = sp;
+  }
+
+  public int getNumSPCols() {
+    return this.numSPCols;
+  }
+
+  public void setSPPath(String sp) {
+    this.spPath = sp;
+  }
+
+  public String getSPPath() {
+    return this.spPath;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Wed Apr 14 23:36:07 2010
@@ -38,13 +38,15 @@ public class FileSinkDesc implements Ser
   private int     totalFiles;
   private ArrayList<ExprNodeDesc> partitionCols;
   private int     numFiles;
+  private DynamicPartitionCtx dpCtx;
 
   public FileSinkDesc() {
   }
 
   public FileSinkDesc(final String dirName, final TableDesc tableInfo,
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
-      final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols) {
+      final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols,
+      final DynamicPartitionCtx dpCtx) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -54,6 +56,7 @@ public class FileSinkDesc implements Ser
     this.numFiles = numFiles;
     this.totalFiles = totalFiles;
     this.partitionCols = partitionCols;
+    this.dpCtx = dpCtx;
   }
 
   public FileSinkDesc(final String dirName, final TableDesc tableInfo,
@@ -179,4 +182,12 @@ public class FileSinkDesc implements Ser
   public void setNumFiles(int numFiles) {
     this.numFiles = numFiles;
   }
+
+  public void setDynPartCtx(DynamicPartitionCtx dpc) {
+    this.dpCtx = dpc;
+  }
+
+  public DynamicPartitionCtx getDynPartCtx() {
+    return this.dpCtx;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Wed Apr 14 23:36:07 2010
@@ -26,7 +26,7 @@ import java.util.Map;
 
 /**
  * Join operator Descriptor implementation.
- * 
+ *
  */
 @Explain(displayName = "Join Operator")
 public class JoinDesc implements Serializable {
@@ -49,7 +49,7 @@ public class JoinDesc implements Seriali
   private Map<Byte, List<ExprNodeDesc>> exprs;
 
   // used for create joinOutputObjectInspector
-  protected java.util.ArrayList<java.lang.String> outputColumnNames;
+  protected List<String> outputColumnNames;
 
   // key:column output name, value:tag
   private transient Map<String, Byte> reversedExprs;
@@ -66,7 +66,7 @@ public class JoinDesc implements Seriali
   }
 
   public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
-      ArrayList<String> outputColumnNames, final boolean noOuterJoin,
+      List<String> outputColumnNames, final boolean noOuterJoin,
       final JoinCondDesc[] conds) {
     this.exprs = exprs;
     this.outputColumnNames = outputColumnNames;
@@ -80,12 +80,12 @@ public class JoinDesc implements Seriali
   }
 
   public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
-      ArrayList<String> outputColumnNames) {
+      List<String> outputColumnNames) {
     this(exprs, outputColumnNames, true, null);
   }
 
   public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
-      ArrayList<String> outputColumnNames, final JoinCondDesc[] conds) {
+      List<String> outputColumnNames, final JoinCondDesc[] conds) {
     this(exprs, outputColumnNames, false, conds);
   }
 
@@ -150,12 +150,12 @@ public class JoinDesc implements Seriali
   }
 
   @Explain(displayName = "outputColumnNames")
-  public java.util.ArrayList<java.lang.String> getOutputColumnNames() {
+  public List<String> getOutputColumnNames() {
     return outputColumnNames;
   }
 
   public void setOutputColumnNames(
-      java.util.ArrayList<java.lang.String> outputColumnNames) {
+      List<String> outputColumnNames) {
     this.outputColumnNames = outputColumnNames;
   }
 
@@ -191,7 +191,7 @@ public class JoinDesc implements Seriali
 
   /**
    * The order in which tables should be processed when joining.
-   * 
+   *
    * @return Array of tags
    */
   public Byte[] getTagOrder() {
@@ -200,7 +200,7 @@ public class JoinDesc implements Seriali
 
   /**
    * The order in which tables should be processed when joining.
-   * 
+   *
    * @param tagOrder
    *          Array of tags
    */
@@ -215,7 +215,7 @@ public class JoinDesc implements Seriali
 
   /**
    * set to handle skew join in this join op.
-   * 
+   *
    * @param handleSkewJoin
    */
   public void setHandleSkewJoin(boolean handleSkewJoin) {
@@ -231,7 +231,7 @@ public class JoinDesc implements Seriali
 
   /**
    * set the mapping from tbl to dir for big keys.
-   * 
+   *
    * @param bigKeysDirMap
    */
   public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
@@ -247,7 +247,7 @@ public class JoinDesc implements Seriali
 
   /**
    * set the mapping from tbl to dir for small keys.
-   * 
+   *
    * @param bigKeysDirMap
    */
   public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
@@ -264,7 +264,7 @@ public class JoinDesc implements Seriali
 
   /**
    * set skew key definition.
-   * 
+   *
    * @param skewKeyDefinition
    */
   public void setSkewKeyDefinition(int skewKeyDefinition) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java Wed Apr 14 23:36:07 2010
@@ -19,7 +19,8 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 /**
  * LoadTableDesc.
@@ -30,31 +31,49 @@ public class LoadTableDesc extends org.a
   private static final long serialVersionUID = 1L;
   private boolean replace;
   private String tmpDir;
+  private DynamicPartitionCtx dpCtx;
 
   // TODO: the below seems like they should just be combined into partitionDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
-  private HashMap<String, String> partitionSpec;
+  private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map
 
   public LoadTableDesc() {
   }
 
   public LoadTableDesc(final String sourceDir, final String tmpDir,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final HashMap<String, String> partitionSpec, final boolean replace) {
-
+      final Map<String, String> partitionSpec, final boolean replace) {
     super(sourceDir);
-    this.tmpDir = tmpDir;
-    this.table = table;
-    this.partitionSpec = partitionSpec;
-    this.replace = replace;
+    init(sourceDir, tmpDir, table, partitionSpec, replace);
   }
 
   public LoadTableDesc(final String sourceDir, final String tmpDir,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-      final HashMap<String, String> partitionSpec) {
+      final Map<String, String> partitionSpec) {
     this(sourceDir, tmpDir, table, partitionSpec, true);
   }
 
+  public LoadTableDesc(final String sourceDir, final String tmpDir,
+      final org.apache.hadoop.hive.ql.plan.TableDesc table,
+      final DynamicPartitionCtx dpCtx) {
+    super(sourceDir);
+    this.dpCtx = dpCtx;
+    if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) {
+      init(sourceDir, tmpDir, table, dpCtx.getPartSpec(), true);
+    } else {
+      init(sourceDir, tmpDir, table, new LinkedHashMap<String, String>(), true);
+    }
+  }
+
+  private void init(final String sourceDir, final String tmpDir,
+      final org.apache.hadoop.hive.ql.plan.TableDesc table,
+      final Map<String, String> partitionSpec, final boolean replace) {
+    this.tmpDir = tmpDir;
+    this.table = table;
+    this.partitionSpec = partitionSpec;
+    this.replace = replace;
+  }
+
   @Explain(displayName = "tmp directory", normalExplain = false)
   public String getTmpDir() {
     return tmpDir;
@@ -74,11 +93,11 @@ public class LoadTableDesc extends org.a
   }
 
   @Explain(displayName = "partition")
-  public HashMap<String, String> getPartitionSpec() {
+  public Map<String, String> getPartitionSpec() {
     return partitionSpec;
   }
 
-  public void setPartitionSpec(final HashMap<String, String> partitionSpec) {
+  public void setPartitionSpec(final Map<String, String> partitionSpec) {
     this.partitionSpec = partitionSpec;
   }
 
@@ -90,4 +109,12 @@ public class LoadTableDesc extends org.a
   public void setReplace(boolean replace) {
     this.replace = replace;
   }
+
+  public DynamicPartitionCtx getDPCtx() {
+    return dpCtx;
+  }
+
+  public void setDPCtx(final DynamicPartitionCtx dpCtx) {
+    this.dpCtx = dpCtx;
+  }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Wed Apr 14 23:36:07 2010
@@ -25,8 +25,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Map Join operator Descriptor implementation.
@@ -67,7 +67,7 @@ public class MapJoinDesc extends JoinDes
 
   public MapJoinDesc(final Map<Byte, List<ExprNodeDesc>> keys,
       final TableDesc keyTblDesc, final Map<Byte, List<ExprNodeDesc>> values,
-      final List<TableDesc> valueTblDescs, ArrayList<String> outputColumnNames,
+      final List<TableDesc> valueTblDescs, List<String> outputColumnNames,
       final int posBigTable, final JoinCondDesc[] conds) {
     super(values, outputColumnNames, conds);
     this.keys = keys;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java Wed Apr 14 23:36:07 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 
@@ -37,6 +38,7 @@ public class MoveWork implements Seriali
   private LoadFileDesc loadFileWork;
 
   private boolean checkFileFormat;
+  ArrayList<String> dpSpecPaths; // dynamic partition specified paths -- the root of DP columns
 
   /**
    * ReadEntitites that are passed to the hooks.
@@ -69,6 +71,14 @@ public class MoveWork implements Seriali
     this.checkFileFormat = checkFileFormat;
   }
 
+  public void setDPSpecPaths(ArrayList<String> dpsp) {
+    dpSpecPaths = dpsp;
+  }
+
+  public ArrayList<String> getDPSpecPaths() {
+    return dpSpecPaths;
+  }
+
   @Explain(displayName = "tables")
   public LoadTableDesc getLoadTableWork() {
     return loadTableWork;

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,11 @@
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nostrict;
+set hive.exec.max.dynamic.partitions=2;
+
+drop table dynamic_partition;
+create table dynamic_partition (key string) partitioned by (value string);
+
+insert overwrite table dynamic_partition partition(hr) select key, value from src;
+
+drop table dynamic_partition;
+

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,30 @@
+show partitions srcpart;
+
+drop table nzhang_part1;
+drop table nzhang_part2;
+
+create table if not exists nzhang_part1 like srcpart;
+create table if not exists nzhang_part2 like srcpart;
+describe extended nzhang_part1;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+
+explain
+from srcpart
+insert overwrite table nzhang_part1 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part2 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part1 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part2 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+
+show partitions nzhang_part1;
+show partitions nzhang_part2;
+
+select * from nzhang_part1 where ds is not null and hr is not null;
+select * from nzhang_part2 where ds is not null and hr is not null;
+
+drop table nzhang_part1;
+drop table nzhang_part2;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part10;
+
+create table if not exists nzhang_part10 like srcpart;
+describe extended nzhang_part10;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain
+from srcpart
+insert overwrite table nzhang_part10 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part10 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+
+show partitions nzhang_part10;
+
+select * from nzhang_part10 where ds is not null and hr is not null;
+
+drop table nzhang_part10;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,17 @@
+show partitions srcpart;
+
+drop table nzhang_part;
+create table if not exists nzhang_part like srcpart;
+describe extended nzhang_part;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.compress.output=true;
+set hive.exec.dynamic.partition=true;
+
+insert overwrite table nzhang_part partition (ds="2010-03-03", hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part where ds = '2010-03-03' and hr = '11';
+select * from nzhang_part where ds = '2010-03-03' and hr = '12';
+
+drop table nzhang_part;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,19 @@
+show partitions srcpart;
+
+drop table nzhang_part12;
+
+create table if not exists nzhang_part12 like srcpart;
+describe extended nzhang_part12;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+
+insert overwrite table nzhang_part12 partition (ds="2010-03-03", hr) select key, value, cast(hr*2 as int) from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part12;
+
+select * from nzhang_part12 where ds is not null and hr is not null;
+
+drop table nzhang_part12;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,37 @@
+show partitions srcpart;
+
+drop table nzhang_part13;
+
+create table if not exists nzhang_part13 like srcpart;
+describe extended nzhang_part13;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part13 partition (ds="2010-03-03", hr) 
+select * from (
+   select key, value, '22'
+   from src
+   where key < 20
+   union all 
+   select key, value, '33'
+   from src 
+   where key > 20 and key < 40) s;
+
+insert overwrite table nzhang_part13 partition (ds="2010-03-03", hr) 
+select * from (
+   select key, value, '22'
+   from src
+   where key < 20
+   union all 
+   select key, value, '33'
+   from src 
+   where key > 20 and key < 40) s;
+
+show partitions nzhang_part13;
+
+select * from nzhang_part13 where ds is not null and hr is not null;
+
+drop table nzhang_part13;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,23 @@
+drop table nzhang_part_bucket;
+create table if not exists nzhang_part_bucket (key string, value string) 
+  partitioned by (ds string, hr string) 
+  clustered by (key) into 10 buckets;
+
+describe extended nzhang_part_bucket;
+
+set hive.merge.mapfiles=false;
+set hive.enforce.bucketing=true;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part_bucket partition (ds='2010-03-23', hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part_bucket partition (ds='2010-03-23', hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part_bucket;
+
+select * from nzhang_part_bucket where ds='2010-03-23' and hr='11' order by key;
+select * from nzhang_part_bucket where ds='2010-03-23' and hr='12' order by key;
+
+drop table nzhang_part_bucket;
+

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,19 @@
+show partitions srcpart;
+
+drop table nzhang_part3;
+
+create table if not exists nzhang_part3 like srcpart;
+describe extended nzhang_part3;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+
+explain
+insert overwrite table nzhang_part3 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part3 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part3 where ds is not null and hr is not null;
+
+drop table nzhang_part3;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part4;
+
+create table if not exists nzhang_part4 like srcpart;
+describe extended nzhang_part4;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+insert overwrite table nzhang_part4 partition (ds='2008-04-08', hr='existing_value') select key, value from src;
+
+explain
+insert overwrite table nzhang_part4 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+insert overwrite table nzhang_part4 partition (ds, hr) select key, value, ds, hr from srcpart where ds is not null and hr is not null;
+
+show partitions nzhang_part4;
+select * from nzhang_part4 where ds='2008-04-08' and hr is not null;
+
+select * from nzhang_part4 where ds is not null and hr is not null;
+
+drop table nzhang_part4;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,22 @@
+drop table nzhang_part5;
+
+create table if not exists nzhang_part5 (key string) partitioned by (value string);
+describe extended nzhang_part5;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.max.dynamic.partitions=2000;
+set hive.exec.max.dynamic.partitions.pernode=2000;
+
+explain
+insert overwrite table nzhang_part5 partition (value) select key, value from src;
+
+insert overwrite table nzhang_part5 partition (value) select key, value from src;
+
+show partitions nzhang_part5;
+
+select * from nzhang_part5 where value='val_0';
+select * from nzhang_part5 where value='val_2';
+
+drop table nzhang_part5;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,16 @@
+show partitions srcpart;
+
+drop table nzhang_part6;
+
+create table if not exists nzhang_part6 like srcpart;
+describe extended nzhang_part6;
+
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+set hive.exec.dynamic.partition=true;
+
+insert overwrite table nzhang_part6 partition (ds="2010-03-03", hr) select key, value, hr from srcpart where ds is not null and hr is not null;
+
+select * from nzhang_part6 where ds = '2010-03-03' and hr = '11';
+select * from nzhang_part6 where ds = '2010-03-03' and hr = '12';
+drop table nzhang_part6;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,14 @@
+show partitions srcpart;
+
+drop table nzhang_part7;
+
+create table if not exists nzhang_part7 like srcpart;
+describe extended nzhang_part7;
+
+
+insert overwrite table nzhang_part7 partition (ds='2010-03-03', hr='12') select key, value from srcpart where ds = '2008-04-08' and hr = '12';
+
+show partitions nzhang_part7;
+
+select * from nzhang_part7 where ds is not null and hr is not null;
+drop table nzhang_part7;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,24 @@
+show partitions srcpart;
+
+drop table nzhang_part8;
+
+create table if not exists nzhang_part8 like srcpart;
+describe extended nzhang_part8;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain extended
+from srcpart
+insert overwrite table nzhang_part8 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part8 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part8 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08'
+insert overwrite table nzhang_part8 partition(ds='2008-12-31', hr) select key, value, hr where ds > '2008-04-08';
+
+show partitions nzhang_part8;
+
+select * from nzhang_part8 where ds is not null and hr is not null;
+drop table nzhang_part8;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q Wed Apr 14 23:36:07 2010
@@ -0,0 +1,23 @@
+show partitions srcpart;
+
+drop table nzhang_part9;
+
+create table if not exists nzhang_part9 like srcpart;
+describe extended nzhang_part9;
+
+set hive.merge.mapfiles=false;
+set hive.exec.dynamic.partition=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+explain
+from srcpart
+insert overwrite table nzhang_part9 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08';
+
+from srcpart
+insert overwrite table nzhang_part9 partition (ds, hr) select key, value, ds, hr where ds <= '2008-04-08';
+
+
+show partitions nzhang_part9;
+
+select * from nzhang_part9 where ds is not null and hr is not null;
+drop table nzhang_part9;

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out Wed Apr 14 23:36:07 2010
@@ -0,0 +1,10 @@
+PREHOOK: query: drop table dynamic_partition
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table dynamic_partition
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table dynamic_partition (key string) partitioned by (value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table dynamic_partition (key string) partitioned by (value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@dynamic_partition
+FAILED: Error in semantic analysis: Partition column in the partition specification does not exist