You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2012/12/05 12:59:26 UTC

svn commit: r1417374 [1/11] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/ha...

Author: namit
Date: Wed Dec  5 11:59:15 2012
New Revision: 1417374

URL: http://svn.apache.org/viewvc?rev=1417374&view=rev
Log:
HIVE-3073 Hive List Bucketing - DML support
(Gang Tim Liu via namit)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_1.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_2.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_3.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_4.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_5.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_6.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_7.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_8.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_dml_9.q
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_3.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_4.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_5.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_6.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_7.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_8.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_dml_9.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hive/trunk/ql/src/test/queries/clientnegative/column_change_skewedcol_type1.q
    hive/trunk/ql/src/test/queries/clientnegative/column_rename5.q
    hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_col_name_value_no_mismatch.q
    hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_dup_col_name.q
    hive/trunk/ql/src/test/queries/clientnegative/create_skewed_table_failure_invalid_col_name.q
    hive/trunk/ql/src/test/queries/clientnegative/invalid_config1.q
    hive/trunk/ql/src/test/queries/clientnegative/load_stored_as_dirs.q
    hive/trunk/ql/src/test/queries/clientpositive/alter_skewed_table.q
    hive/trunk/ql/src/test/queries/clientpositive/create_alter_list_bucketing_table1.q
    hive/trunk/ql/src/test/queries/clientpositive/create_skewed_table1.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_1.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_2.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_multiskew_3.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_1.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_2.q
    hive/trunk/ql/src/test/queries/clientpositive/list_bucket_query_oneskew_3.q
    hive/trunk/ql/src/test/results/clientnegative/invalid_config1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_2.q.out
    hive/trunk/ql/src/test/results/clientpositive/list_bucket_query_multiskew_3.q.out
    hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Wed Dec  5 11:59:15 2012
@@ -22,12 +22,11 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-
 import org.apache.hadoop.util.Shell;
 
 
@@ -121,6 +120,47 @@ public final class FileUtils {
     return name.toString();
   }
 
+  /**
+   * default directory will have the same depth as number of skewed columns
+   * this will make future operation easy like DML merge, concatenate merge
+   * @param skewedCols
+   * @param name
+   * @return
+   */
+  public static String makeDefaultListBucketingDirName(List<String> skewedCols,
+      String name) {
+    String lbDirName;
+    String defaultDir = FileUtils.escapePathName(name);
+    StringBuilder defaultDirPath = new StringBuilder();
+    for (int i = 0; i < skewedCols.size(); i++) {
+      if (i > 0) {
+        defaultDirPath.append(Path.SEPARATOR);
+      }
+      defaultDirPath.append(defaultDir);
+    }
+    lbDirName = defaultDirPath.toString();
+    return lbDirName;
+  }
+
+  /**
+   * Makes a valid list bucketing directory name.
+   * @param lbCols The skewed keys' names
+   * @param vals The skewed values
+   * @return An escaped, valid list bucketing directory name.
+   */
+  public static String makeListBucketingDirName(List<String> lbCols, List<String> vals) {
+    StringBuilder name = new StringBuilder();
+    for (int i = 0; i < lbCols.size(); i++) {
+      if (i > 0) {
+        name.append(Path.SEPARATOR);
+      }
+      name.append(escapePathName((lbCols.get(i)).toLowerCase()));
+      name.append('=');
+      name.append(escapePathName(vals.get(i)));
+    }
+    return name.toString();
+  }
+
   // NOTE: This is for generating the internal path name for partitions. Users
   // should always use the MetaStore API to get the path name for a partition.
   // Users should not directly take partition values and turn it into a path
@@ -157,7 +197,7 @@ public final class FileUtils {
     for (char c : clist) {
       charToEscape.set(c);
     }
-    
+
     if(Shell.WINDOWS){
       //On windows, following chars need to be escaped as well
       char [] winClist = {' ', '<','>','|'};
@@ -257,27 +297,4 @@ public final class FileUtils {
     }
   }
 
-  /**
-   * default directory will have the same depth as number of skewed columns
-   * this will make future operation easy like DML merge, concatenate merge
-   * @param skewedCols
-   * @param hconf
-   * @return
-   */
-  public static String makeDefaultListBucketingDirName(List<String> skewedCols,
-      Configuration hconf) {
-    String lbDirName;
-    String defaultDir = FileUtils.escapePathName(HiveConf.getVar(hconf,
-        HiveConf.ConfVars.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME));
-    StringBuilder defaultDirPath = new StringBuilder();
-    for (int i = 0; i < skewedCols.size(); i++) {
-      if (i > 0) {
-        defaultDirPath.append(Path.SEPARATOR);
-      }
-      defaultDirPath.append(defaultDir);
-    }
-    lbDirName = defaultDirPath.toString();
-    return lbDirName;
-  }
-
 }

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Dec  5 11:59:15 2012
@@ -658,15 +658,17 @@ public class HiveConf extends Configurat
         "hive.multi.insert.move.tasks.share.dependencies", false),
 
     /* The following section contains all configurations used for list bucketing feature.*/
-    // Enable list bucketing DDL. Default value is false so that we disable it by default.
-    // This will be removed once the rest of the DML changes are committed.
-    HIVE_INTERNAL_DDL_LIST_BUCKETING_ENABLE("hive.internal.ddl.list.bucketing.enable", false),
-
-    // Default list bucketing directory name.
-    HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME("hive.exec.list.bucketing.default.dir",
-        "HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME"),
+    /* This is not for clients. but only for block merge task. */
+    /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
+    /* about alter table...concatenate and list bucketing case. */
+    HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING(
+        "hive.merge.current.job.concatenate.list.bucketing", true),
+    /* This is not for clients. but only for block merge task. */
+    /* This is used by BlockMergeTask to send out flag to RCFileMergeMapper */
+    /* about depth of list bucketing. */
+    HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH(
+            "hive.merge.current.job.concatenate.list.bucketing.depth", 0),
     // Enable list bucketing optimizer. Default value is false so that we disable it by default.
-    // This will be removed once the rest of the DML changes are committed.
     HIVEOPTLISTBUCKETING("hive.optimize.listbucketing", false),
 
     // Allow TCP Keep alive socket option for for HiveServer or a maximum timeout for the socket.

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Dec  5 11:59:15 2012
@@ -537,23 +537,6 @@
 </property>
 
 <property>
-  <name>hive.exec.list.bucketing.default.dir</name>
-  <value>HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME</value>
-  <description>Default directory name used in list bucketing.
-    List bucketing feature will create sub-directory for each skewed-value and a default directory
-    for non-skewed value. This config specifies the default name for the default directory.
-    Sub-directory is created by list bucketing DML and under partition directory. User doesn't need 
-    to know how to construct the canonical path. It just gives user choice if they want to change 
-    the default directory name.
-    For example, there are 2 skewed column c1 and c2. 2 skewed value: (1,a) and (2,b). subdirectory:
-    <partition-dir>/c1=1/c2=a/
-    <partition-dir>/c1=2/c2=b/
-    <partition-dir>/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
-    Note: This config won't impact users if they don't list bucketing.
-</description>
-</property>
-
-<property>
   <name>hive.skewjoin.key</name>
   <value>100000</value>
   <description>Determine if we get a skew key in join. If we see more

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Dec  5 11:59:15 2012
@@ -969,8 +969,7 @@ public class Driver implements CommandPr
   private boolean validateConfVariables() {
     boolean valid = true;
     if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES))
-        && ((conf.getBoolVar(HiveConf.ConfVars.HIVE_INTERNAL_DDL_LIST_BUCKETING_ENABLE))
-            || (conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE)) || (conf
+        && ((conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE)) || (conf
               .getBoolVar(HiveConf.ConfVars.HIVEOPTLISTBUCKETING)))) {
       errorMessage = "FAILED: Hive Internal Error: "
           + ErrorMsg.SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING.getMsg();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Wed Dec  5 11:59:15 2012
@@ -252,7 +252,7 @@ public enum ErrorMsg {
   SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING(
       10199,
       "hive.mapred.supports.subdirectories must be true"
-          + " if any one of following is true: hive.internal.ddl.list.bucketing.enable,"
+          + " if any one of following is true: "
           + " hive.optimize.listbucketing and mapred.input.dir.recursive"),
   SKEWED_TABLE_NO_COLUMN_NAME(10200, "No skewed column name."),
   SKEWED_TABLE_NO_COLUMN_VALUE(10201, "No skewed values."),
@@ -270,9 +270,6 @@ public enum ErrorMsg {
   ALTER_TABLE_NOT_ALLOWED_RENAME_SKEWED_COLUMN(10207,
       " is a skewed column. It's not allowed to rename skewed column"
           + " or change skewed column type."),
- HIVE_INTERNAL_DDL_LIST_BUCKETING_DISABLED(10208,
-              "List Bucketing DDL is not allowed to use since feature is not completed yet."),
-
   HIVE_GROUPING_SETS_AGGR_NOMAPAGGR(10209,
     "Grouping sets aggregations (with rollups or cubes) are not allowed if map-side " +
     " aggregation is turned off. Set hive.map.aggr=true if you want to use grouping sets"),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Dec  5 11:59:15 2012
@@ -464,6 +464,9 @@ public class DDLTask extends Task<DDLWor
     // merge work only needs input and output.
     MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
         mergeFilesDesc.getOutputDir());
+    mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
+    mergeWork.resolveConcatenateMerge(db.getConf());
+    mergeWork.setMapperCannotSpanPartns(true);
     DriverContext driverCxt = new DriverContext();
     BlockMergeTask taskExec = new BlockMergeTask();
     taskExec.initialize(db.getConf(), null, driverCxt);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Dec  5 11:59:15 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -83,8 +85,11 @@ public class FileSinkOperator extends Te
   protected transient List<Object> dpWritables;
   protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
   protected transient int maxPartitions;
+  protected transient ListBucketingCtx lbCtx;
+  protected transient boolean isSkewedStoredAsSubDirectories;
   private transient boolean statsCollectRawDataSize;
 
+
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
@@ -308,6 +313,7 @@ public class FileSinkOperator extends Te
       totalFiles = conf.getTotalFiles();
       numFiles = conf.getNumFiles();
       dpCtx = conf.getDynPartCtx();
+      lbCtx = conf.getLbCtx();
       valToPaths = new HashMap<String, FSPaths>();
       taskId = Utilities.getTaskId(hconf);
       initializeSpecPath();
@@ -357,13 +363,19 @@ public class FileSinkOperator extends Te
         dpSetup();
       }
 
+      if (lbCtx != null) {
+        lbSetup();
+      }
+
       if (!bDynParts) {
         fsp = new FSPaths(specPath);
 
         // Create all the files - this is required because empty files need to be created for
         // empty buckets
         // createBucketFiles(fsp);
-        valToPaths.put("", fsp); // special entry for non-DP case
+        if (!this.isSkewedStoredAsSubDirectories) {
+          valToPaths.put("", fsp); // special entry for non-DP case
+        }
       }
       initializeChildren(hconf);
     } catch (HiveException e) {
@@ -375,6 +387,13 @@ public class FileSinkOperator extends Te
   }
 
   /**
+   * Initialize list bucketing information
+   */
+  private void lbSetup() {
+    this.isSkewedStoredAsSubDirectories =  ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir());
+  }
+
+  /**
    * Set up for dynamic partitioning including a new ObjectInspector for the output row.
    */
   private void dpSetup() {
@@ -468,7 +487,7 @@ public class FileSinkOperator extends Te
           // we create.
           String extension = Utilities.getFileExtension(jc, isCompressed,
               hiveOutputFormat);
-          if (!bDynParts) {
+          if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
             fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
           } else {
             fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
@@ -539,8 +558,18 @@ public class FileSinkOperator extends Te
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
+    /* Create list bucketing sub-directory only if stored-as-directories is on. */
+    String lbDirName = null;
+    lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
+
+    FSPaths fpaths;
+
     if (!bDynParts && !filesCreated) {
-      createBucketFiles(fsp);
+      if (lbDirName != null) {
+        FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
+      } else {
+        createBucketFiles(fsp);
+      }
     }
 
     // Since File Sink is a terminal operator, forward is not called - so,
@@ -559,8 +588,6 @@ public class FileSinkOperator extends Te
       // if DP is enabled, get the final output writers and prepare the real output row
       assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
 
-      FSPaths fpaths;
-
       if (bDynParts) {
         // copy the DP column values from the input row to dpVals
         dpVals.clear();
@@ -578,10 +605,14 @@ public class FileSinkOperator extends Te
         }
         // use SubStructObjectInspector to serialize the non-partitioning columns in the input row
         recordValue = serializer.serialize(row, subSetOI);
-        fpaths = getDynOutPaths(dpVals);
+        fpaths = getDynOutPaths(dpVals, lbDirName);
 
       } else {
-        fpaths = fsp;
+        if (lbDirName != null) {
+          fpaths = lookupListBucketingPaths(lbDirName);
+        } else {
+          fpaths = fsp;
+        }
         // use SerDe to serialize r, and write it out
         recordValue = serializer.serialize(row, inputObjInspectors[0]);
       }
@@ -623,7 +654,91 @@ public class FileSinkOperator extends Te
     }
   }
 
-  private FSPaths getDynOutPaths(List<String> row) throws HiveException {
+  /**
+   * Lookup list bucketing path.
+   * @param lbDirName
+   * @return
+   * @throws HiveException
+   */
+  private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+    FSPaths fsp2 = valToPaths.get(lbDirName);
+    if (fsp2 == null) {
+      fsp2 = createNewPaths(lbDirName);
+    }
+    return fsp2;
+  }
+
+  /**
+   * create new path.
+   *
+   * @param dirName
+   * @return
+   * @throws HiveException
+   */
+  private FSPaths createNewPaths(String dirName) throws HiveException {
+    FSPaths fsp2 = new FSPaths(specPath);
+    if (childSpecPathDynLinkedPartitions != null) {
+      fsp2.tmpPath = new Path(fsp2.tmpPath,
+          dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
+      fsp2.taskOutputTempPath =
+        new Path(fsp2.taskOutputTempPath,
+            dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
+    } else {
+      fsp2.tmpPath = new Path(fsp2.tmpPath, dirName);
+      fsp2.taskOutputTempPath =
+        new Path(fsp2.taskOutputTempPath, dirName);
+    }
+    createBucketFiles(fsp2);
+    valToPaths.put(dirName, fsp2);
+    return fsp2;
+  }
+
+  /**
+   * Generate list bucketing directory name from a row.
+   * @param row row to process.
+   * @return directory name.
+   */
+  private String generateListBucketingDirName(Object row) {
+    if (!this.isSkewedStoredAsSubDirectories) {
+      return null;
+    }
+
+    String lbDirName = null;
+    List<Object> standObjs = new ArrayList<Object>();
+    List<String> skewedCols = lbCtx.getSkewedColNames();
+    List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
+    List<String> skewedValsCandidate = null;
+    Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
+
+    /* Convert input row to standard objects. */
+    ObjectInspectorUtils.copyToStandardObject(standObjs, row,
+        (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+
+    assert (standObjs.size() >= skewedCols.size()) :
+      "The row has less number of columns than no. of skewed column.";
+
+    skewedValsCandidate = new ArrayList<String>(skewedCols.size());
+    for (int index : lbCtx.getRowSkewedIndex()) {
+      skewedValsCandidate.add(index, standObjs.get(index).toString());
+    }
+    /* The row matches skewed column names. */
+    if (allSkewedVals.contains(skewedValsCandidate)) {
+      /* matches skewed values. */
+      lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
+      locationMap.put(skewedValsCandidate, lbDirName);
+    } else {
+      /* create default directory. */
+      lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
+          lbCtx.getDefaultDirName());
+      List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
+      if (!locationMap.containsKey(defaultKey)) {
+        locationMap.put(defaultKey, lbDirName);
+      }
+    }
+    return lbDirName;
+  }
+
+  private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
 
     FSPaths fp;
 
@@ -631,6 +746,7 @@ public class FileSinkOperator extends Te
     String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
 
     if (dpDir != null) {
+      dpDir = appendListBucketingDirName(lbDirName, dpDir);
       FSPaths fsp2 = valToPaths.get(dpDir);
 
       if (fsp2 == null) {
@@ -641,20 +757,7 @@ public class FileSinkOperator extends Te
           fatalError = true;
           LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
         }
-        fsp2 = new FSPaths(specPath);
-        if (childSpecPathDynLinkedPartitions != null) {
-          fsp2.tmpPath = new Path(fsp2.tmpPath,
-            dpDir + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-          fsp2.taskOutputTempPath =
-            new Path(fsp2.taskOutputTempPath,
-              dpDir + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
-        } else {
-          fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir);
-          fsp2.taskOutputTempPath =
-            new Path(fsp2.taskOutputTempPath, dpDir);
-        }
-        createBucketFiles(fsp2);
-        valToPaths.put(dpDir, fsp2);
+        fsp2 = createNewPaths(dpDir);
       }
       fp = fsp2;
     } else {
@@ -663,6 +766,20 @@ public class FileSinkOperator extends Te
     return fp;
   }
 
+  /**
+   * Append list bucketing dir name to original dir name.
+   * Skewed columns cannot be partitioned columns.
+   * @param lbDirName
+   * @param dpDir
+   * @return
+   */
+  private String appendListBucketingDirName(String lbDirName, String dpDir) {
+    StringBuilder builder = new StringBuilder(dpDir);
+    dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName)
+          .toString();
+    return dpDir;
+  }
+
   // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
   // return the relative path corresponding to the row.
   // e.g., ds=2008-04-08/hr=11
@@ -866,10 +983,7 @@ public class FileSinkOperator extends Te
       } else {
         // for partitioned table, the key is
         // common key prefix + static partition spec + DynamicPartSpec + taskID
-        String keyPrefix = Utilities.getHashedStatsPrefix(
-            conf.getStatsAggPrefix() + spSpec + fspKey + Path.SEPARATOR,
-            conf.getMaxStatsKeyPrefixLength());
-        key = keyPrefix + taskID;
+        key = createKeyForStatsPublisher(taskID, spSpec, fspKey);
       }
       Map<String, String> statsToPublish = new HashMap<String, String>();
       for (String statType : fspValue.stat.getStoredStats()) {
@@ -891,4 +1005,64 @@ public class FileSinkOperator extends Te
       }
     }
   }
+
+  /**
+   * This is server side code to create key in order to save statistics to stats database.
+   * Client side will read it via StatsTask.java aggregateStats().
+   * Client side reads it via db query prefix which is based on partition spec.
+   * Since store-as-subdir information is not part of partition spec, we have to
+   * remove store-as-subdir information from variable "keyPrefix" calculation.
+   * But we have to keep store-as-subdir information in variable "key" calculation
+   * since each skewed value has a row in stats db and "key" is db key,
+   * otherwise later value overwrites previous value.
+   * Performance impact due to string handling is minimum since this method is
+   * only called once in FileSinkOperator closeOp().
+   * For example,
+   * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES;
+   * skewedValueDirList contains 2 elements:
+   * 1. key=484/value=val_484
+   * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+   * Case #1: Static partition with store-as-sub-dir
+   * spSpec has SP path
+   * fspKey has either
+   * key=484/value=val_484 or
+   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+   * After filter, fspKey is empty, storedAsDirPostFix has either
+   * key=484/value=val_484 or
+   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+   * so, at the end, "keyPrefix" doesnt have subdir information but "key" has
+   * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr
+   * spSpec has SP path
+   * fspKey has either
+   * hr=11/key=484/value=val_484 or
+   * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+   * After filter, fspKey is hr=11, storedAsDirPostFix has either
+   * key=484/value=val_484 or
+   * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
+   * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has
+   * @param taskID
+   * @param spSpec
+   * @param fspKey
+   * @return
+   */
+  private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) {
+    String key;
+    String newFspKey = fspKey;
+    String storedAsDirPostFix = "";
+    if (isSkewedStoredAsSubDirectories) {
+      List<String> skewedValueDirList = this.lbCtx.getSkewedValuesDirNames();
+      for (String dir : skewedValueDirList) {
+        newFspKey = newFspKey.replace(dir, "");
+        if (!newFspKey.equals(fspKey)) {
+          storedAsDirPostFix = dir;
+          break;
+        }
+      }
+    }
+    String keyPrefix = Utilities.getHashedStatsPrefix(
+        conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR,
+        conf.getMaxStatsKeyPrefixLength());
+    key = keyPrefix + storedAsDirPostFix + taskID;
+    return key;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Dec  5 11:59:15 2012
@@ -37,8 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -263,7 +263,8 @@ public class MoveTask extends Task<MoveW
                 	tbd.getPartitionSpec(),
                 	tbd.getReplace(),
                 	dpCtx.getNumDPCols(),
-                	tbd.getHoldDDLTime());
+                	tbd.getHoldDDLTime(),
+                	isSkewedStoredAsDirs(tbd));
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
@@ -302,7 +303,8 @@ public class MoveTask extends Task<MoveW
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
             db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(), tbd.getInheritTableSpecs());
+                tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
+                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd));
           	Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
           	dc = new DataContainer(table.getTTable(), partn.getTPartition());
           	// add this partition to post-execution hook
@@ -325,6 +327,11 @@ public class MoveTask extends Task<MoveW
     }
   }
 
+  private boolean isSkewedStoredAsDirs(LoadTableDesc tbd) {
+    return (tbd.getLbCtx() == null) ? false : tbd.getLbCtx()
+        .isSkewedStoredAsDir();
+  }
+
   /*
    * Does the move task involve moving to a local file system
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Wed Dec  5 11:59:15 2012
@@ -395,7 +395,10 @@ public class StatsTask extends Task<Stat
           }
 
           fileSys = partn.getPartitionPath().getFileSystem(conf);
-          fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(), 1, fileSys);
+          /* consider sub-directory created from list bucketing. */
+          int listBucketingDepth = calculateListBucketingDMLDepth(partn);
+          fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(),
+              (1 + listBucketingDepth), fileSys);
           newPartStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
 
           long partitionSize = 0L;
@@ -470,6 +473,28 @@ public class StatsTask extends Task<Stat
     return ret;
   }
 
+  /**
+   * List bucketing will introduce sub-directories.
+   *
+   * calculate it here in order to go to the leaf directory
+   *
+   * so that we can count right number of files.
+   *
+   * @param partn
+   * @return
+   */
+  private int calculateListBucketingDMLDepth(Partition partn) {
+    // list bucketing will introduce more files
+    int listBucketingDepth = 0;
+    if ((partn.getSkewedColNames() != null) && (partn.getSkewedColNames().size() > 0)
+        && (partn.getSkewedColValues() != null) && (partn.getSkewedColValues().size() > 0)
+        && (partn.getSkewedColValueLocationMaps() != null)
+        && (partn.getSkewedColValueLocationMaps().size() > 0)) {
+      listBucketingDepth = partn.getSkewedColNames().size();
+    }
+    return listBucketingDepth;
+  }
+
   private boolean existStats(Map<String, String> parameters) {
     return parameters.containsKey(StatsSetupConst.ROW_COUNT)
         || parameters.containsKey(StatsSetupConst.NUM_FILES)

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Wed Dec  5 11:59:15 2012
@@ -161,6 +161,16 @@ public class BlockMergeTask extends Task
         HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS,
         work.hasDynamicPartitions());
 
+    HiveConf.setBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING,
+        work.isListBucketingAlterTableConcatenate());
+
+    HiveConf.setIntVar(
+        job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH,
+        ((work.getListBucketingCtx() == null) ? 0 : work.getListBucketingCtx()
+            .calculateListBucketingLevel()));
+
     int returnVal = 0;
     RunningJob rj = null;
     boolean noName = StringUtils.isEmpty(HiveConf.getVar(job,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Wed Dec  5 11:59:15 2012
@@ -18,16 +18,21 @@
 
 package org.apache.hadoop.hive.ql.io.rcfile.merge;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -43,6 +48,8 @@ public class MergeWork extends MapredWor
   private String outputDir;
   private boolean hasDynamicPartitions;
   private DynamicPartitionCtx dynPartCtx;
+  private boolean isListBucketingAlterTableConcatenate;
+  private ListBucketingCtx listBucketingCtx;
 
   public MergeWork() {
   }
@@ -115,7 +122,7 @@ public class MergeWork extends MapredWor
   }
 
   @Override
-  public void resolveDynamicPartitionMerge(HiveConf conf, Path path,
+  public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path,
       TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) {
 
     String inputFormatClass = conf.getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
@@ -126,12 +133,56 @@ public class MergeWork extends MapredWor
       String msg = "Merge input format class not found";
       throw new RuntimeException(msg);
     }
-    super.resolveDynamicPartitionMerge(conf, path, tblDesc, aliases, partDesc);
+    super.resolveDynamicPartitionStoredAsSubDirsMerge(conf, path, tblDesc, aliases, partDesc);
 
     // Add the DP path to the list of input paths
     inputPaths.add(path.toString());
   }
 
+  /**
+   * alter table ... concatenate
+   *
+   * If it is skewed table, use subdirectories in inputpaths.
+   */
+  public void resolveConcatenateMerge(HiveConf conf) {
+    isListBucketingAlterTableConcatenate = ((listBucketingCtx == null) ? false : listBucketingCtx
+        .isSkewedStoredAsDir());
+    if (isListBucketingAlterTableConcatenate) {
+      // use sub-dir as inputpath.
+      assert ((this.inputPaths != null) && (this.inputPaths.size() == 1)) :
+        "alter table ... concatenate should only have one directory inside inputpaths";
+      String dirName = inputPaths.get(0);
+      Path dirPath = new Path(dirName);
+      try {
+        FileSystem inpFs = dirPath.getFileSystem(conf);
+        FileStatus[] status = Utilities.getFileStatusRecurse(dirPath, listBucketingCtx
+            .getSkewedColNames().size(), inpFs);
+        List<String> newInputPath = new ArrayList<String>();
+        boolean succeed = true;
+        for (int i = 0; i < status.length; ++i) {
+           if (status[i].isDir()) {
+             // Add the lb path to the list of input paths
+             newInputPath.add(status[i].getPath().toString());
+           } else {
+             // find file instead of dir. dont change inputpath
+             succeed = false;
+           }
+        }
+        assert (succeed || ((!succeed) && newInputPath.isEmpty())) : "This partition has "
+            + " inconsistent file structure: "
+            + "it is stored-as-subdir and expected all files in the same depth of subdirectories.";
+        if (succeed) {
+          inputPaths.clear();
+          inputPaths.addAll(newInputPath);
+        }
+      } catch (IOException e) {
+        String msg = "Fail to get filesystem for directory name : " + dirName;
+        throw new RuntimeException(msg, e);
+      }
+
+    }
+  }
+
   public DynamicPartitionCtx getDynPartCtx() {
     return dynPartCtx;
   }
@@ -140,4 +191,25 @@ public class MergeWork extends MapredWor
     this.dynPartCtx = dynPartCtx;
   }
 
+  /**
+   * @return the listBucketingCtx
+   */
+  public ListBucketingCtx getListBucketingCtx() {
+    return listBucketingCtx;
+  }
+
+  /**
+   * @param listBucketingCtx the listBucketingCtx to set
+   */
+  public void setListBucketingCtx(ListBucketingCtx listBucketingCtx) {
+    this.listBucketingCtx = listBucketingCtx;
+  }
+
+  /**
+   * @return the isListBucketingAlterTableConcatenate
+   */
+  public boolean isListBucketingAlterTableConcatenate() {
+    return isListBucketingAlterTableConcatenate;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Wed Dec  5 11:59:15 2012
@@ -60,6 +60,10 @@ public class RCFileMergeMapper extends M
   int columnNumber = 0;
 
   boolean hasDynamicPartitions = false;
+  boolean isListBucketingDML = false;
+  boolean isListBucketingAlterTableConcatenate = false;
+  int listBucketingDepth; // used as depth for dir-calculation and if it is list bucketing case.
+  boolean tmpPathFixedConcatenate = false;
   boolean tmpPathFixed = false;
   Path tmpPath;
   Path taskTmpPath;
@@ -75,6 +79,10 @@ public class RCFileMergeMapper extends M
     jc = job;
     hasDynamicPartitions = HiveConf.getBoolVar(job,
         HiveConf.ConfVars.HIVEMERGECURRENTJOBHASDYNAMICPARTITIONS);
+    isListBucketingAlterTableConcatenate = HiveConf.getBoolVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETING);
+    listBucketingDepth = HiveConf.getIntVar(job,
+        HiveConf.ConfVars.HIVEMERGECURRENTJOBCONCATENATELISTBUCKETINGDEPTH);
 
     String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
         .toString();
@@ -112,13 +120,33 @@ public class RCFileMergeMapper extends M
         key = (RCFileKeyBufferWrapper) k;
       }
 
-      if (hasDynamicPartitions) {
-        if (tmpPathFixed) {
+      /**
+       * 1. boolean isListBucketingAlterTableConcatenate will be true only if it is alter table ...
+       * concatenate on stored-as-dir so it will handle list bucketing alter table merge in the if
+       * cause with the help of fixTmpPathConcatenate
+       * 2. If it is DML, isListBucketingAlterTableConcatenate will be false so that it will be
+       * handled by else cause. In this else cause, we have another if check.
+       * 2.1 the if check will make sure DP or LB, we will fix path with the help of fixTmpPath(..).
+       * Since both has sub-directories. it includes SP + LB.
+       * 2.2 only SP without LB, we dont fix path.
+       */
+      // Fix temp path for alter table ... concatenate
+      if (isListBucketingAlterTableConcatenate) {
+        if (this.tmpPathFixedConcatenate) {
           checkPartitionsMatch(key.inputPath.getParent());
         } else {
-          // We haven't fixed the TMP path for this mapper yet
-          fixTmpPath(key.inputPath.getParent());
-          tmpPathFixed = true;
+          fixTmpPathConcatenate(key.inputPath.getParent());
+          tmpPathFixedConcatenate = true;
+        }
+      } else {
+        if (hasDynamicPartitions || (listBucketingDepth > 0)) {
+          if (tmpPathFixed) {
+            checkPartitionsMatch(key.inputPath.getParent());
+          } else {
+            // We haven't fixed the TMP path for this mapper yet
+            fixTmpPath(key.inputPath.getParent());
+            tmpPathFixed = true;
+          }
         }
       }
 
@@ -166,7 +194,18 @@ public class RCFileMergeMapper extends M
   /**
    * Fixes tmpPath to point to the correct partition.
    * Before this is called, tmpPath will default to the root tmp table dir
-   *
+   * fixTmpPath(..) works for DP + LB + multiple skewed values + merge. reason:
+   * 1. fixTmpPath(..) compares inputPath and tmpDepth, find out path difference and put it into
+   * newPath. Then add newpath to existing this.tmpPath and this.taskTmpPath.
+   * 2. The path difference between inputPath and tmpDepth can be DP or DP+LB. It will automatically
+   * handle it.
+   * 3. For example,
+   * if inputpath is <prefix>/-ext-10002/hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/
+   * HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
+   * tmppath is <prefix>/_tmp.-ext-10000
+   * newpath will be hr=a1/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME/HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME
+   * Then, this.tmpPath and this.taskTmpPath will be update correctly.
+   * We have list_bucket_dml_6.q cover this case: DP + LP + multiple skewed values + merge.
    * @param inputPath
    * @throws HiveException
    * @throws IOException
@@ -193,6 +232,48 @@ public class RCFileMergeMapper extends M
     updatePaths(newTmpPath, newTaskTmpPath);
   }
 
+  /**
+   * Fixes tmpPath to point to the correct list bucketing sub-directories.
+   * Before this is called, tmpPath will default to the root tmp table dir
+   * Reason to add a new method instead of changing fixTmpPath()
+   * Reason 1: logic has slightly difference
+   * fixTmpPath(..) needs 2 variables in order to decide path delta which is in variable newPath.
+   * 1. inputPath.depth()
+   * 2. tmpPath.depth()
+   * fixTmpPathConcatenate needs 2 variables too but one of them is different from fixTmpPath(..)
+   * 1. inputPath.depth()
+   * 2. listBucketingDepth
+   * Reason 2: less risks
+   * The existing logic is a little not trivial around map() and fixTmpPath(). In order to ensure
+   * minimum impact on existing flow, we try to avoid change on existing code/flow but add new code
+   * for new feature.
+   *
+   * @param inputPath
+   * @throws HiveException
+   * @throws IOException
+   */
+  private void fixTmpPathConcatenate(Path inputPath)
+      throws HiveException, IOException {
+    dpPath = inputPath;
+    Path newPath = new Path(".");
+
+    int depth = listBucketingDepth;
+    // Build the path from bottom up. pick up list bucketing subdirectories
+    while ((inputPath != null) && (depth > 0)) {
+      newPath = new Path(inputPath.getName(), newPath);
+      inputPath = inputPath.getParent();
+      depth--;
+    }
+
+    Path newTmpPath = new Path(tmpPath, newPath);
+    Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+    if (!fs.exists(newTmpPath)) {
+      fs.mkdirs(newTmpPath);
+    }
+    updatePaths(newTmpPath, newTaskTmpPath);
+  }
+
+
   @Override
   public void close() throws IOException {
     // close writer

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Wed Dec  5 11:59:15 2012
@@ -28,6 +28,7 @@ import static org.apache.hadoop.hive.ser
 import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
 import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
@@ -71,9 +73,11 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -1116,7 +1120,7 @@ public class Hive {
 
   /**
    * Load a directory into a Hive Table Partition - Alters existing content of
-   * the partition with the contents of loadPath. - If he partition does not
+   * the partition with the contents of loadPath. - If the partition does not
    * exist - one is created - files in loadPath are moved into Hive. But the
    * directory itself is not removed.
    *
@@ -1135,7 +1139,7 @@ public class Hive {
    */
   public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
-      boolean inheritTableSpecs)
+      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir)
       throws HiveException {
     Table tbl = getTable(tableName);
     try {
@@ -1187,7 +1191,21 @@ public class Hive {
 
       // recreate the partition if it existed before
       if (!holdDDLTime) {
-        getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
+        Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
+            inheritTableSpecs);
+        if (isSkewedStoreAsSubdir) {
+          org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
+          SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
+          /* Construct list bucketing location mappings from sub-directory name. */
+          Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
+              newPartPath, skewedInfo);
+          /* Add list bucketing location mappings. */
+          skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
+          newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
+          alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+          newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
+          newCreatedTpart = newTPart.getTPartition();
+        }
       }
     } catch (IOException e) {
       LOG.error(StringUtils.stringifyException(e));
@@ -1195,10 +1213,96 @@ public class Hive {
     } catch (MetaException e) {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
+    } catch (InvalidOperationException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw new HiveException(e);
+    }
+
+  }
+
+  /**
+ * Walk through sub-directory tree to construct list bucketing location map.
+ *
+ * @param fSta
+ * @param fSys
+ * @param skewedColValueLocationMaps
+ * @param newPartPath
+ * @param skewedInfo
+ * @throws IOException
+ */
+private void walkDirTree(FileStatus fSta, FileSystem fSys,
+    Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+    throws IOException {
+  /* Base Case. It's leaf. */
+  if (!fSta.isDir()) {
+    /* construct one location map if not exists. */
+    constructOneLBLocationMap(fSta, skewedColValueLocationMaps, newPartPath, skewedInfo);
+    return;
+  }
+
+  /* dfs. */
+  FileStatus[] children = fSys.listStatus(fSta.getPath());
+  if (children != null) {
+    for (FileStatus child : children) {
+      walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo);
     }
+  }
+}
 
+/**
+ * Construct a list bucketing location map
+ * @param fSta
+ * @param skewedColValueLocationMaps
+ * @param newPartPath
+ * @param skewedInfo
+ */
+private void constructOneLBLocationMap(FileStatus fSta,
+    Map<List<String>, String> skewedColValueLocationMaps,
+    Path newPartPath, SkewedInfo skewedInfo) {
+  Path lbdPath = fSta.getPath().getParent();
+  List<String> skewedValue = new ArrayList<String>();
+  String lbDirName = FileUtils.unescapePathName(lbdPath.toString());
+  String partDirName = FileUtils.unescapePathName(newPartPath.toString());
+  String lbDirSuffix = lbDirName.replace(partDirName, "");
+  String[] dirNames = lbDirSuffix.split(Path.SEPARATOR);
+  for (String dirName : dirNames) {
+    if ((dirName != null) && (dirName.length() > 0)) {
+      // Construct skewed-value to location map except default directory.
+      // why? query logic knows default-dir structure and don't need to get from map
+        if (!dirName
+            .equalsIgnoreCase(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME)) {
+        String[] kv = dirName.split("=");
+        if (kv.length == 2) {
+          skewedValue.add(kv[1]);
+        }
+      }
+    }
+  }
+  if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
+      && !skewedColValueLocationMaps.containsKey(skewedValue)) {
+    skewedColValueLocationMaps.put(skewedValue, lbDirName);
+  }
+}
+
+  /**
+   * Construct location map from path
+   *
+   * @param newPartPath
+   * @param skewedInfo
+   * @return
+   * @throws IOException
+   * @throws FileNotFoundException
+   */
+  private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+      SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
+    Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+    FileSystem fSys = newPartPath.getFileSystem(conf);
+    walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
+        skewedInfo);
+    return skewedColValueLocationMaps;
   }
 
+
   /**
    * Given a source directory name of the load path, load all dynamically generated partitions
    * into the specified table and return a list of strings that represent the dynamic partition
@@ -1214,7 +1318,7 @@ public class Hive {
    */
   public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
-      int numDP, boolean holdDDLTime)
+      int numDP, boolean holdDDLTime, boolean listBucketingEnabled)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
@@ -1263,7 +1367,8 @@ public class Hive {
         fullPartSpecs.add(fullPartSpec);
 
         // finally load the partition -- move the file to the final table address
-        loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true);
+        loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
+            listBucketingEnabled);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
       return fullPartSpecs;
@@ -2030,9 +2135,8 @@ public class Hive {
    * @param oldPath
    *          The directory where the old data location, need to be cleaned up.
    */
-  static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
-      HiveConf conf) throws HiveException {
-
+  static protected void replaceFiles(Path srcf, Path destf, Path oldPath, HiveConf conf)
+      throws HiveException {
     try {
       FileSystem fs = srcf.getFileSystem(conf);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Dec  5 11:59:15 2012
@@ -614,6 +614,10 @@ public class Partition implements Serial
     tPartition.setLastAccessTime(lastAccessTime);
   }
 
+  public boolean isStoredAsSubDirectories() {
+    return tPartition.getSd().isStoredAsSubDirectories();
+  }
+
   public List<List<String>> getSkewedColValues(){
     return tPartition.getSd().getSkewedInfo().getSkewedColValues();
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Wed Dec  5 11:59:15 2012
@@ -21,8 +21,11 @@ package org.apache.hadoop.hive.ql.metada
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.index.H
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.DescTableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ShowIndexesDesc;
 
 
@@ -209,8 +213,18 @@ public final class MetaDataFormatUtils {
       Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
           .getSkewedColValueLocationMaps();
       if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
-        formatOutput("Skewed Value to Location Mapping:", skewedColMap.toString(),
+        formatOutput("Skewed Value to Path:", skewedColMap.toString(),
             tableInfo);
+        Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
+        // walk through existing map to truncate path so that test won't mask it
+        // then we can verify location is right
+        Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
+        for (Entry<List<String>, String> entry : entries) {
+          truncatedSkewedColMap.put(entry.getKey(),
+              PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
+        }
+        formatOutput("Skewed Value to Truncated Path:",
+            truncatedSkewedColMap.toString(), tableInfo);
       }
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Dec  5 11:59:15 2012
@@ -106,8 +106,8 @@ public class GenMRFileSink1 implements N
     Task<? extends Serializable> currTask = ctx.getCurrTask();
     FileSinkOperator fsOp = (FileSinkOperator) nd;
     boolean isInsertTable = // is INSERT OVERWRITE TABLE
-      fsOp.getConf().getTableInfo().getTableName() != null &&
-      parseCtx.getQB().getParseInfo().isInsertToTable();
+    fsOp.getConf().getTableInfo().getTableName() != null &&
+        parseCtx.getQB().getParseInfo().isInsertToTable();
     HiveConf hconf = parseCtx.getConf();
 
     // If this file sink desc has been processed due to a linked file sink desc,
@@ -147,21 +147,20 @@ public class GenMRFileSink1 implements N
                 hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES)) {
               chDir = true;
             }
-          }
-          else {
-            // There are separate configuration parameters to control whether to
-            // merge for a map-only job
-            // or for a map-reduce job
-            MapredWork currWork = (MapredWork) currTask.getWork();
-            boolean mergeMapOnly =
-              hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
-              currWork.getReducer() == null;
-            boolean mergeMapRed =
-              hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
-              currWork.getReducer() != null;
-            if (mergeMapOnly || mergeMapRed) {
-              chDir = true;
-            }
+          } else {
+              // There are separate configuration parameters to control whether to
+              // merge for a map-only job
+              // or for a map-reduce job
+              MapredWork currWork = (MapredWork) currTask.getWork();
+              boolean mergeMapOnly =
+                  hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES) &&
+                      currWork.getReducer() == null;
+              boolean mergeMapRed =
+                  hconf.getBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES) &&
+                      currWork.getReducer() != null;
+              if (mergeMapOnly || mergeMapRed) {
+                chDir = true;
+              }
           }
         }
       }
@@ -229,20 +228,25 @@ public class GenMRFileSink1 implements N
    * Add the StatsTask as a dependent task of the MoveTask
    * because StatsTask will change the Table/Partition metadata. For atomicity, we
    * should not change it before the data is actually there done by MoveTask.
-   * @param nd the FileSinkOperator whose results are taken care of by the MoveTask.
-   * @param mvTask The MoveTask that moves the FileSinkOperator's results.
-   * @param currTask The MapRedTask that the FileSinkOperator belongs to.
-   * @param hconf HiveConf
+   *
+   * @param nd
+   *          the FileSinkOperator whose results are taken care of by the MoveTask.
+   * @param mvTask
+   *          The MoveTask that moves the FileSinkOperator's results.
+   * @param currTask
+   *          The MapRedTask that the FileSinkOperator belongs to.
+   * @param hconf
+   *          HiveConf
    */
   private void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
       Task<? extends Serializable> currTask, HiveConf hconf) {
 
-    MoveWork mvWork = ((MoveTask)mvTask).getWork();
+    MoveWork mvWork = ((MoveTask) mvTask).getWork();
     StatsWork statsWork = null;
-    if(mvWork.getLoadTableWork() != null){
-       statsWork = new StatsWork(mvWork.getLoadTableWork());
-    }else if (mvWork.getLoadFileWork() != null){
-       statsWork = new StatsWork(mvWork.getLoadFileWork());
+    if (mvWork.getLoadTableWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadTableWork());
+    } else if (mvWork.getLoadFileWork() != null) {
+      statsWork = new StatsWork(mvWork.getLoadFileWork());
     }
     assert statsWork != null : "Error when genereting StatsTask";
     statsWork.setStatsReliable(hconf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
@@ -250,7 +254,7 @@ public class GenMRFileSink1 implements N
 
     // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
     // in FileSinkDesc is used for stats publishing. They should be consistent.
-    statsWork.setAggKey(((FileSinkOperator)nd).getConf().getStatsAggPrefix());
+    statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
     Task<? extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
 
     // mark the MapredWork and FileSinkOperator for gathering stats
@@ -281,7 +285,7 @@ public class GenMRFileSink1 implements N
     ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
     for (ColumnInfo ci : inputRS.getSignature()) {
       valueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
-          ci.getTabAlias(), ci.getIsVirtualCol()));
+          ci.getTabAlias(), ci.getIsVirtualCol(), ci.isSkewedCol()));
     }
 
     // create a dummy tableScan operator
@@ -314,7 +318,7 @@ public class GenMRFileSink1 implements N
     Operator<ExtractDesc> extract = OperatorFactory.getAndMakeChild(new ExtractDesc(
         new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
             Utilities.ReduceField.VALUE.toString(), "", false)),
-            new RowSchema(out_rwsch.getColumnInfos()));
+        new RowSchema(out_rwsch.getColumnInfos()));
 
     TableDesc ts = (TableDesc) fsConf.getTableInfo().clone();
     fsConf.getTableInfo().getProperties().remove(
@@ -323,7 +327,7 @@ public class GenMRFileSink1 implements N
     FileSinkDesc newFSD = new FileSinkDesc(finalName, ts, parseCtx.getConf()
         .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
     FileSinkOperator newOutput = (FileSinkOperator) OperatorFactory.
-      getAndMakeChild(newFSD, inputRS, extract);
+        getAndMakeChild(newFSD, inputRS, extract);
 
     HiveConf conf = parseCtx.getConf();
     MapredWork cplan = createMergeTask(conf, tsMerge, fsConf);
@@ -344,9 +348,13 @@ public class GenMRFileSink1 implements N
   /**
    * Create a MapReduce job for a particular partition if Hadoop version is pre 0.20,
    * otherwise create a Map-only job using CombineHiveInputFormat for all partitions.
-   * @param fsOp The FileSink operator.
-   * @param ctx The MR processing context.
-   * @param finalName the final destination path the merge job should output.
+   *
+   * @param fsOp
+   *          The FileSink operator.
+   * @param ctx
+   *          The MR processing context.
+   * @param finalName
+   *          the final destination path the merge job should output.
    * @throws SemanticException
    */
   private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName)
@@ -372,32 +380,36 @@ public class GenMRFileSink1 implements N
 
   /**
    * create a Map-only merge job with the following operators:
+   *
    * @param fsInput
    * @param ctx
    * @param finalName
-   *  MR job J0:
+   *          MR job J0:
    *          ...
-   *              |
-   *              v
-   *         FileSinkOperator_1 (fsInput)
-   *             |
-   *             v
-   *  Merge job J1:
-   *             |
-   *             v
-   *         TableScan (using CombineHiveInputFormat) (tsMerge)
-   *             |
-   *             v
-   *         FileSinkOperator (fsMerge)
-   *
-   * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths do
-   * not contain the dynamic partitions (their parent). So after the dynamic partitions are
-   * created (after the first job finished before the moveTask or ConditionalTask start),
-   * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic partition
-   * directories.
+   *          |
+   *          v
+   *          FileSinkOperator_1 (fsInput)
+   *          |
+   *          v
+   *          Merge job J1:
+   *          |
+   *          v
+   *          TableScan (using CombineHiveInputFormat) (tsMerge)
+   *          |
+   *          v
+   *          FileSinkOperator (fsMerge)
+   *
+   *          Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+   *          do
+   *          not contain the dynamic partitions (their parent). So after the dynamic partitions are
+   *          created (after the first job finished before the moveTask or ConditionalTask start),
+   *          we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+   *          partition
+   *          directories.
    *
    */
-  private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) throws SemanticException {
+  private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName)
+      throws SemanticException {
 
     //
     // 1. create the operator tree
@@ -408,14 +420,14 @@ public class GenMRFileSink1 implements N
     // Create a TableScan operator
     RowSchema inputRS = fsInput.getSchema();
     Operator<? extends OperatorDesc> tsMerge =
-      OperatorFactory.get(TableScanDesc.class, inputRS);
+        OperatorFactory.get(TableScanDesc.class, inputRS);
 
     // Create a FileSink operator
     TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
-    FileSinkDesc fsOutputDesc =  new FileSinkDesc(finalName, ts,
+    FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
         parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSRESULT));
     FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
-        fsOutputDesc,  inputRS, tsMerge);
+        fsOutputDesc, inputRS, tsMerge);
 
     // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
     // needs to include the partition column, and the fsOutput should have
@@ -427,7 +439,7 @@ public class GenMRFileSink1 implements N
       String tblAlias = fsInputDesc.getTableInfo().getTableName();
       LinkedHashMap<String, String> colMap = new LinkedHashMap<String, String>();
       StringBuilder partCols = new StringBuilder();
-      for (String dpCol: dpCtx.getDPColNames()) {
+      for (String dpCol : dpCtx.getDPColNames()) {
         ColumnInfo colInfo = new ColumnInfo(dpCol,
             TypeInfoFactory.stringTypeInfo, // all partition column type should be string
             tblAlias, true); // partition column is virtual column
@@ -435,7 +447,7 @@ public class GenMRFileSink1 implements N
         colMap.put(dpCol, dpCol); // input and output have the same column name
         partCols.append(dpCol).append('/');
       }
-      partCols.setLength(partCols.length()-1); // remove the last '/'
+      partCols.setLength(partCols.length() - 1); // remove the last '/'
       inputRS.setSignature(signature);
 
       // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
@@ -461,16 +473,16 @@ public class GenMRFileSink1 implements N
         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
     MapredWork cplan;
 
-    if(parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
+    if (parseCtx.getConf().getBoolVar(HiveConf.ConfVars.
         HIVEMERGERCFILEBLOCKLEVEL) &&
         fsInputDesc.getTableInfo().getInputFileFormatClass().
-        equals(RCFileInputFormat.class)) {
+            equals(RCFileInputFormat.class)) {
 
       // Check if InputFormatClass is valid
       String inputFormatClass = parseCtx.getConf().
           getVar(HiveConf.ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
       try {
-        Class c = (Class <? extends InputFormat>) Class.forName(inputFormatClass);
+        Class c = (Class<? extends InputFormat>) Class.forName(inputFormatClass);
 
         LOG.info("RCFile format- Using block level merge");
         cplan = createRCFileMergeTask(fsInputDesc, finalName,
@@ -492,14 +504,15 @@ public class GenMRFileSink1 implements N
 
     // keep the dynamic partition context in conditional task resolver context
     ConditionalResolverMergeFilesCtx mrCtx =
-      (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+        (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
     mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+    mrCtx.setLbCtx(fsInputDesc.getLbCtx());
 
     //
     // 3. add the moveTask as the children of the conditional task
     //
     linkMoveTask(ctx, fsOutput, cndTsk);
- }
+  }
 
   /**
    * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
@@ -542,10 +555,11 @@ public class GenMRFileSink1 implements N
   }
 
   /**
-   * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask.  If mvTask is a
+   * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
    * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
    * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
    * well.
+   *
    * @param ctx
    * @param mvTask
    * @param parentTask
@@ -576,11 +590,16 @@ public class GenMRFileSink1 implements N
   /**
    * Create a MapredWork based on input path, the top operator and the input
    * table descriptor.
+   *
    * @param conf
-   * @param topOp the table scan operator that is the root of the MapReduce task.
-   * @param fsDesc the file sink descriptor that serves as the input to this merge task.
-   * @param parentMR the parent MapReduce work
-   * @param parentFS the last FileSinkOperator in the parent MapReduce work
+   * @param topOp
+   *          the table scan operator that is the root of the MapReduce task.
+   * @param fsDesc
+   *          the file sink descriptor that serves as the input to this merge task.
+   * @param parentMR
+   *          the parent MapReduce work
+   * @param parentFS
+   *          the last FileSinkOperator in the parent MapReduce work
    * @return the MapredWork
    */
   private MapredWork createMergeTask(HiveConf conf, Operator<? extends OperatorDesc> topOp,
@@ -604,6 +623,7 @@ public class GenMRFileSink1 implements N
 
   /**
    * Create a block level merge task for RCFiles.
+   *
    * @param fsInputDesc
    * @param finalName
    * @return MergeWork if table is stored as RCFile,
@@ -615,9 +635,10 @@ public class GenMRFileSink1 implements N
     String inputDir = fsInputDesc.getFinalDirName();
     TableDesc tblDesc = fsInputDesc.getTableInfo();
 
-    if(tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+    if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
       ArrayList<String> inputDirs = new ArrayList<String>();
-      if (!hasDynamicPartitions) {
+      if (!hasDynamicPartitions
+          && !isSkewedStoredAsDirs(fsInputDesc)) {
         inputDirs.add(inputDir);
       }
 
@@ -630,10 +651,12 @@ public class GenMRFileSink1 implements N
       work.setPathToAliases(pathToAliases);
       work.setAliasToWork(
           new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
-      if (hasDynamicPartitions) {
+      if (hasDynamicPartitions
+          || isSkewedStoredAsDirs(fsInputDesc)) {
         work.getPathToPartitionInfo().put(inputDir,
             new PartitionDesc(tblDesc, null));
       }
+      work.setListBucketingCtx(fsInputDesc.getLbCtx());
 
       return work;
     }
@@ -642,12 +665,29 @@ public class GenMRFileSink1 implements N
   }
 
   /**
+   * check if it is skewed table and stored as dirs.
+   *
+   * @param fsInputDesc
+   * @return
+   */
+  private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+    return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+        .isSkewedStoredAsDir();
+  }
+
+  /**
    * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
-   * @param conf HiveConf
-   * @param currTask current leaf task
-   * @param mvWork MoveWork for the move task
-   * @param mergeWork MapredWork for the merge task.
-   * @param inputPath the input directory of the merge/move task
+   *
+   * @param conf
+   *          HiveConf
+   * @param currTask
+   *          current leaf task
+   * @param mvWork
+   *          MoveWork for the move task
+   * @param mergeWork
+   *          MapredWork for the merge task.
+   * @param inputPath
+   *          the input directory of the merge/move task
    * @return The conditional task
    */
   private ConditionalTask createCondTask(HiveConf conf,
@@ -658,8 +698,8 @@ public class GenMRFileSink1 implements N
     // 1) Merge the partitions
     // 2) Move the partitions (i.e. don't merge the partitions)
     // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
-    //    merge others) in this case the merge is done first followed by the move to prevent
-    //    conflicts.
+    // merge others) in this case the merge is done first followed by the move to prevent
+    // conflicts.
     Task<? extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
     Task<? extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
     Task<? extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
@@ -687,7 +727,7 @@ public class GenMRFileSink1 implements N
     // create resolver
     cndTsk.setResolver(new ConditionalResolverMergeFiles());
     ConditionalResolverMergeFilesCtx mrCtx =
-      new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+        new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
     cndTsk.setResolverCtx(mrCtx);
 
     // make the conditional task as the child of the current leaf task
@@ -719,11 +759,15 @@ public class GenMRFileSink1 implements N
 
   /**
    * Process the FileSink operator to generate a MoveTask if necessary.
-   * @param nd current FileSink operator
-   * @param stack parent operators
+   *
+   * @param nd
+   *          current FileSink operator
+   * @param stack
+   *          parent operators
    * @param opProcCtx
-   * @param chDir whether the operator should be first output to a tmp dir and then merged
-   *        to the final dir later
+   * @param chDir
+   *          whether the operator should be first output to a tmp dir and then merged
+   *          to the final dir later
    * @return the final file name to which the FileSinkOperator should store.
    * @throws SemanticException
    */
@@ -783,7 +827,7 @@ public class GenMRFileSink1 implements N
     Operator<? extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
     String currAliasId = ctx.getCurrAliasId();
     HashMap<Operator<? extends OperatorDesc>, Task<? extends Serializable>> opTaskMap =
-      ctx.getOpTaskMap();
+        ctx.getOpTaskMap();
     List<Operator<? extends OperatorDesc>> seenOps = ctx.getSeenOps();
     List<Task<? extends Serializable>> rootTasks = ctx.getRootTasks();
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/PrunerOperatorFactory.java Wed Dec  5 11:59:15 2012
@@ -141,10 +141,14 @@ public abstract class PrunerOperatorFact
         TableScanOperator top, ExprNodeDesc new_pruner_pred, Partition part)
         throws UDFArgumentException {
       Map<String, ExprNodeDesc> oldPartToPruner = opToPrunner.get(top);
+      Map<String, ExprNodeDesc> partToPruner = null;
       ExprNodeDesc pruner_pred = null;
       if (oldPartToPruner == null) {
         pruner_pred = new_pruner_pred;
+        // create new mapping
+        partToPruner = new HashMap<String, ExprNodeDesc>();
       } else {
+        partToPruner = oldPartToPruner;
         ExprNodeDesc old_pruner_pred = oldPartToPruner.get(part.getName());
         if (old_pruner_pred != null) {
           // or the old_pruner_pred and the new_ppr_pred
@@ -156,8 +160,8 @@ public abstract class PrunerOperatorFact
       }
 
       // Put the mapping from part to pruner_pred
-      Map<String, ExprNodeDesc> partToPruner = new HashMap<String, ExprNodeDesc>();
       partToPruner.put(part.getName(), pruner_pred);
+
       // Put the mapping from table scan operator to part-pruner map
       opToPrunner.put(top, partToPruner);
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Wed Dec  5 11:59:15 2012
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -36,17 +35,12 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
  * The transformation step that does list bucketing pruning.
  *
  */
 public class ListBucketingPruner implements Transform {
-
-  public static final String DEFAULT_SKEWED_DIRECTORY = SessionState.get()
-      .getConf().getVar(ConfVars.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
-  public static final String DEFAULT_SKEWED_KEY = "HIVE_DEFAULT_LIST_BUCKETING_KEY";
   static final Log LOG = LogFactory.getLog(ListBucketingPruner.class.getName());
 
   /*
@@ -312,9 +306,12 @@ public class ListBucketingPruner impleme
       // Handle skewed value.
       if (skewedValues.contains(cell)) { // if it is skewed value
         if ((matchResult == null) || matchResult) { // add directory to path unless value is false
-          assert (mappings.containsKey(cell)) : "Skewed location mappings doesn't have an entry "
-            + "for a skewed value: " + cell;
-          selectedPaths.add(new Path(mappings.get(cell)));
+          /* It's valid case if a partition: */
+          /* 1. is defined with skewed columns and skewed values in metadata */
+          /* 2. doesn't have all skewed values within its data */
+          if (mappings.get(cell) != null) {
+            selectedPaths.add(new Path(mappings.get(cell)));
+          }
         }
       } else {
         // Non-skewed value, add it to list for later handle on default directory.
@@ -344,8 +341,10 @@ public class ListBucketingPruner impleme
       StringBuilder builder = new StringBuilder();
       builder.append(part.getLocation());
       builder.append(Path.SEPARATOR);
-      builder.append((FileUtils.makeDefaultListBucketingDirName(
-          part.getSkewedColNames(), SessionState.get().getConf())));
+      builder
+          .append((FileUtils.makeDefaultListBucketingDirName(
+              part.getSkewedColNames(),
+              ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME)));
       selectedPaths.add(new Path(builder.toString()));
     }
   }
@@ -480,7 +479,7 @@ public class ListBucketingPruner impleme
         throws SemanticException {
       // Calculate unique skewed elements for each skewed column.
       List<List<String>> uniqSkewedElements = DynamicMultiDimensionalCollection.uniqueElementsList(
-          values, DEFAULT_SKEWED_KEY);
+          values, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY);
       // Calculate complete dynamic-multi-dimension collection.
       return DynamicMultiDimensionalCollection.flat(uniqSkewedElements);
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPrunerUtils.java Wed Dec  5 11:59:15 2012
@@ -36,6 +36,12 @@ import org.apache.hadoop.hive.ql.udf.gen
  */
 public final class ListBucketingPrunerUtils {
 
+  /* Default list bucketing directory name. internal use only not for client. */
+  public static String HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME =
+      "HIVE_DEFAULT_LIST_BUCKETING_DIR_NAME";
+  /* Default list bucketing directory key. internal use only not for client. */
+  public static String HIVE_LIST_BUCKETING_DEFAULT_KEY = "HIVE_DEFAULT_LIST_BUCKETING_KEY";
+
   /**
    * Decide if pruner skips the skewed directory
    * Input: if the skewed value matches the expression tree
@@ -280,9 +286,9 @@ public final class ListBucketingPrunerUt
     //   but, if unknown. not(c=3) will be unknown. we will choose default dir.
     // 3 all others, return false
     if (cellValueInPosition.equals(constantValueInFilter)
-        && !cellValueInPosition.equals(ListBucketingPruner.DEFAULT_SKEWED_KEY)) {
+        && !cellValueInPosition.equals(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY)) {
       result = Boolean.TRUE;
-    } else if (cellValueInPosition.equals(ListBucketingPruner.DEFAULT_SKEWED_KEY)
+    } else if (cellValueInPosition.equals(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY)
         && !uniqSkewedValuesInPosition.contains(constantValueInFilter)) {
       result = null;
     } else {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java?rev=1417374&r1=1417373&r2=1417374&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTablePartMergeFilesDesc.java Wed Dec  5 11:59:15 2012
@@ -23,13 +23,15 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 
 @Explain(displayName = "Alter Table Partition Merge Files")
 public class AlterTablePartMergeFilesDesc {
-  
+
   private String tableName;
   private HashMap<String, String> partSpec;
-  
+  private ListBucketingCtx lbCtx; // context for list bucketing.
+
   private List<String> inputDir = new ArrayList<String>();
   private String outputDir = null;
 
@@ -73,4 +75,18 @@ public class AlterTablePartMergeFilesDes
     this.inputDir = inputDir;
   }
 
+  /**
+   * @return the lbCtx
+   */
+  public ListBucketingCtx getLbCtx() {
+    return lbCtx;
+  }
+
+  /**
+   * @param lbCtx the lbCtx to set
+   */
+  public void setLbCtx(ListBucketingCtx lbCtx) {
+    this.lbCtx = lbCtx;
+  }
+
 }