You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2010/02/19 01:58:31 UTC

svn commit: r911664 [1/15] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org...

Author: heyongqiang
Date: Fri Feb 19 00:58:28 2010
New Revision: 911664

URL: http://svn.apache.org/viewvc?rev=911664&view=rev
Log:
HIVE-1178. enforce bucketing for a table.(Namit Jain via He Yongqiang)

Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket3.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/bucketmapjoin4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/ctas.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby_map_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/groupby_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input23.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input42.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/input_part9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join17.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join26.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join32.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join33.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join34.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join35.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/join_map_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/louter_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/outer_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rand_partitionpruner3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/regexp_extract.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/router_join_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/sample9.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/transform_ppr2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/udf_explode.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union22.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/union_ppr.q.out
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/union.q.xml

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Feb 19 00:58:28 2010
@@ -27,6 +27,9 @@
     HIVE-917. Bucketed Map Join
     (He Yongqiang via namit)
 
+    HIVE-1178. enforce bucketing for a table.
+    (Namit Jain via He Yongqiang)
+
   IMPROVEMENTS
 
     HIVE-983. Function from_unixtime takes long.

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Feb 19 00:58:28 2010
@@ -190,6 +190,9 @@
 
     HIVEINPUTFORMAT("hive.input.format", ""),
 
+    HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
+    HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+
     // Optimizer
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Fri Feb 19 00:58:28 2010
@@ -308,16 +308,16 @@
 <property>
   <name>hive.skewjoin.mapjoin.map.tasks</name>
   <value>10000</value>
-  <description> Determine the number of map task used in the follow up map join job 
-	for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split 
+  <description> Determine the number of map task used in the follow up map join job
+	for a skew join. It should be used together with hive.skewjoin.mapjoin.min.split
 	to perform a fine grained control.</description>
 </property>
 
 <property>
   <name>hive.skewjoin.mapjoin.min.split</name>
   <value>33554432</value>
-  <description> Determine the number of map task at most used in the follow up map join job 
-	for a skew join by specifying the minimum split size. It should be used together with 
+  <description> Determine the number of map task at most used in the follow up map join job
+	for a skew join by specifying the minimum split size. It should be used together with
 	hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
 </property>
 
@@ -461,4 +461,10 @@
   <description>Whether speculative execution for reducers should be turned on. </description>
 </property>
 
+<property>
+  <name>hive.enforce.bucketing</name>
+  <value>false</value>
+  <description>Whether bucketing is enforced. If true, while inserting into the table, bucketing is enforced. </description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Feb 19 00:58:28 2010
@@ -74,6 +74,7 @@
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.varia.NullAppender;
+import org.apache.hadoop.mapred.Partitioner;
 
 /**
  * ExecDriver.
@@ -176,10 +177,10 @@
   /**
    * In Hive, when the user control-c's the command line, any running jobs
    * spawned from that command line are best-effort killed.
-   * 
+   *
    * This static constructor registers a shutdown thread to iterate over all the
    * running job kill URLs and do a get on them.
-   * 
+   *
    */
   static {
     if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -271,7 +272,7 @@
    * regular joins rather than map-side joins. Fatal errors are indicated by
    * counters that are set at execution time. If the counter is non-zero, a
    * fatal error occurred. The value of the counter indicates the error type.
-   * 
+   *
    * @return true if fatal errors happened during job execution, false
    *         otherwise.
    */
@@ -374,7 +375,7 @@
   /**
    * Estimate the number of reducers needed for this job, based on job input,
    * and configuration parameters.
-   * 
+   *
    * @return the number of reducers.
    */
   public int estimateNumberOfReducers(HiveConf hive, JobConf job,
@@ -439,7 +440,7 @@
 
   /**
    * Calculate the total size of input files.
-   * 
+   *
    * @param job
    *          the hadoop job conf.
    * @return the total size in bytes.
@@ -553,6 +554,14 @@
 
     job.setMapOutputKeyClass(HiveKey.class);
     job.setMapOutputValueClass(BytesWritable.class);
+
+    try {
+      job.setPartitionerClass((Class<? extends Partitioner>)
+         (Class.forName(HiveConf.getVar(job, HiveConf.ConfVars.HIVEPARTITIONER))));
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+
     if (work.getNumMapTasks() != null) {
       job.setNumMapTasks(work.getNumMapTasks().intValue());
     }
@@ -735,7 +744,7 @@
 
   /**
    * This msg pattern is used to track when a job is started.
-   * 
+   *
    * @param jobId
    * @return
    */
@@ -745,7 +754,7 @@
 
   /**
    * this msg pattern is used to track when a job is successfully done.
-   * 
+   *
    * @param jobId
    * @return
    */

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Feb 19 00:58:28 2010
@@ -20,23 +20,33 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * File Sink operator implementation.
@@ -55,16 +65,31 @@
   }
 
   private static final long serialVersionUID = 1L;
-  protected transient RecordWriter outWriter;
   protected transient FileSystem fs;
-  protected transient Path outPath;
-  protected transient Path finalPath;
   protected transient Serializer serializer;
   protected transient BytesWritable commonKey = new BytesWritable();
   protected transient TableIdEnum tabIdEnum = null;
   private transient LongWritable row_count;
 
   /**
+   * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets,
+   * it is not a good idea to start so many reducers - if the maximum number of reducers is 100,
+   * each reducer can write 10 files - this way we effectively get 1000 files.
+   */
+  private transient ExprNodeEvaluator[] partitionEval;
+  private transient int      totalFiles;
+  private transient int      numFiles;
+  private transient boolean  multiFileSpray;
+  private transient Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+
+  private transient RecordWriter[] outWriters;
+  private transient Path[] outPaths;
+  private transient Path[] finalPaths;
+  private transient ObjectInspector[] partitionObjectInspectors;
+  private transient HivePartitioner<HiveKey, Object> prtner;
+  private transient HiveKey key = new HiveKey();
+
+  /**
    * TableIdEnum.
    *
    */
@@ -88,11 +113,11 @@
 
   protected transient boolean autoDelete = false;
 
-  private void commit() throws IOException {
-    if (!fs.rename(outPath, finalPath)) {
-      throw new IOException("Unable to rename output to: " + finalPath);
+  private void commit(int idx) throws IOException {
+    if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+      throw new IOException("Unable to rename output to: " + finalPaths[idx]);
     }
-    LOG.info("Committed to output file: " + finalPath);
+    LOG.info("Committed " + outPaths[idx] + " to output file: " + finalPaths[idx]);
   }
 
   @Override
@@ -110,42 +135,97 @@
         jc = new JobConf(hconf, ExecDriver.class);
       }
 
-      int id = conf.getDestTableId();
-      if ((id != 0) && (id <= TableIdEnum.values().length)) {
-        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
-        tabIdEnum = TableIdEnum.valueOf(enumName);
-        row_count = new LongWritable();
-        statsMap.put(tabIdEnum, row_count);
+      multiFileSpray = conf.isMultiFileSpray();
+      totalFiles = conf.getTotalFiles();
+      numFiles   = conf.getNumFiles();
+
+      if (multiFileSpray) {
+        partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+        int i = 0;
+        for (ExprNodeDesc e : conf.getPartitionCols()) {
+          partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+        }
 
+        partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
+        prtner = (HivePartitioner<HiveKey, Object>)ReflectionUtils.newInstance(jc.getPartitionerClass(), null);
       }
+
+      outWriters = new RecordWriter[numFiles];
+      outPaths   = new Path[numFiles];
+      finalPaths = new Path[numFiles];
+
       String specPath = conf.getDirName();
       Path tmpPath = Utilities.toTempPath(specPath);
-      String taskId = Utilities.getTaskId(hconf);
+      Set<Integer> seenBuckets = new HashSet<Integer>();
       fs = (new Path(specPath)).getFileSystem(hconf);
-      finalPath = new Path(tmpPath, taskId);
-      outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
-
-      LOG.info("Writing to temp file: FS " + outPath);
 
       HiveOutputFormat<?, ?> hiveOutputFormat = conf.getTableInfo()
-          .getOutputFileFormatClass().newInstance();
+        .getOutputFileFormatClass().newInstance();
       boolean isCompressed = conf.getCompressed();
-
-      // The reason to keep these instead of using
-      // OutputFormat.getRecordWriter() is that
-      // getRecordWriter does not give us enough control over the file name that
-      // we create.
       Path parent = Utilities.toTempPath(specPath);
-      finalPath = HiveFileFormatUtils.getOutputFormatFinalPath(parent, jc,
-          hiveOutputFormat, isCompressed, finalPath);
-      final Class<? extends Writable> outputClass = serializer
-          .getSerializedClass();
-      outWriter = HiveFileFormatUtils.getHiveRecordWriter(jc, conf
-          .getTableInfo(), outputClass, conf, outPath);
+      final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+
+      // Create all the files - this is required because empty files need to be created for empty buckets
+      int filesIdx = 0;
+      for (int idx = 0; idx < totalFiles; idx++) {
+        String taskId = Utilities.getTaskId(hconf);
+
+        if (multiFileSpray) {
+          key.setHashCode(idx);
+
+          // Does this hashcode belong to this reducer
+          int numReducers = totalFiles/numFiles;
+
+          if (numReducers > 1) {
+            int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)));
+
+            int reducerIdx = prtner.getPartition(key, null, numReducers);
+            if (currReducer != reducerIdx)
+              continue;
+          }
+
+          int bucketNum = prtner.getBucket(key, null, totalFiles);
+          if (seenBuckets.contains(bucketNum))
+            continue;
+          seenBuckets.add(bucketNum);
+
+          bucketMap.put(bucketNum, filesIdx);
+          taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
+        }
+
+        finalPaths[filesIdx] = new Path(tmpPath, taskId);
+        LOG.info("Final Path: FS " + finalPaths[filesIdx]);
+
+        outPaths[filesIdx] = new Path(tmpPath, Utilities.toTempPath(taskId));
+
+        LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
+
+        // The reason to keep these instead of using
+        // OutputFormat.getRecordWriter() is that
+        // getRecordWriter does not give us enough control over the file name that
+        // we create.
+        finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath(parent, taskId, jc,
+                                                                            hiveOutputFormat, isCompressed, finalPaths[filesIdx]);
+        LOG.info("New Final Path: FS " + finalPaths[filesIdx]);
+
+        outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf
+                                                                       .getTableInfo(), outputClass, conf, outPaths[filesIdx]);
+
+        filesIdx++;
+      }
+
+      assert filesIdx == numFiles;
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
-      autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
-          outPath);
+      autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, outPaths[0]);
+
+      int id = conf.getDestTableId();
+      if ((id != 0) && (id <= TableIdEnum.values().length)) {
+        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
+        tabIdEnum = TableIdEnum.valueOf(enumName);
+        row_count = new LongWritable();
+        statsMap.put(tabIdEnum, row_count);
+      }
 
       initializeChildren(hconf);
     } catch (HiveException e) {
@@ -180,7 +260,21 @@
         row_count.set(row_count.get() + 1);
       }
 
-      outWriter.write(recordValue);
+      if (!multiFileSpray) {
+        outWriters[0].write(recordValue);
+      }
+      else {
+        int keyHashCode = 0;
+        for (int i = 0; i < partitionEval.length; i++) {
+          Object o = partitionEval[i].evaluate(row);
+          keyHashCode = keyHashCode * 31
+              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+        }
+        key.setHashCode(keyHashCode);
+        int bucketNum = prtner.getBucket(key, null, totalFiles);
+        int idx = bucketMap.get(bucketNum);
+        outWriters[bucketMap.get(bucketNum)].write(recordValue);
+      }
     } catch (IOException e) {
       throw new HiveException(e);
     } catch (SerDeException e) {
@@ -192,12 +286,14 @@
   public void closeOp(boolean abort) throws HiveException {
 
     if (!abort) {
-      if (outWriter != null) {
-        try {
-          outWriter.close(abort);
-          commit();
-        } catch (IOException e) {
-          throw new HiveException(e);
+      for (int idx = 0; idx < numFiles; idx++) {
+        if (outWriters[idx] != null) {
+          try {
+            outWriters[idx].close(abort);
+            commit(idx);
+          } catch (IOException e) {
+            throw new HiveException(e);
+          }
         }
       }
     } else {
@@ -205,9 +301,11 @@
       // Hadoop always call close() even if an Exception was thrown in map() or
       // reduce().
       try {
-        outWriter.close(abort);
-        if (!autoDelete) {
-          fs.delete(outPath, true);
+        for (int idx = 0; idx < numFiles; idx++) {
+          outWriters[idx].close(abort);
+          if (!autoDelete) {
+            fs.delete(outPaths[idx], true);
+          }
         }
       } catch (Exception e) {
         e.printStackTrace();

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Feb 19 00:58:28 2010
@@ -984,6 +984,24 @@
   }
 
   /**
+   * Replace the task id from the filename. E.g., replace "000000" out of
+   * "24931_r_000000_0" or "24931_r_000000_0.gz" by 33 to
+   * "24931_r_000033_0" or "24931_r_000033_0.gz"
+   */
+  public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
+    String taskId = getTaskIdFromFilename(filename);
+    String strBucketNum = String.valueOf(bucketNum);
+    int bucketNumLen = strBucketNum.length();
+    int taskIdLen = taskId.length();
+    StringBuffer s = new StringBuffer();
+    for (int i = 0; i < taskIdLen - bucketNumLen; i++) {
+        s.append("0");
+    }
+    String newTaskId = s.toString() + strBucketNum; 
+    return filename.replaceAll(taskId, newTaskId);
+  }
+  
+  /**
    * Remove all temporary files and duplicate (double-committed) files from a
    * given directory.
    */

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Feb 19 00:58:28 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+
+/** Partition keys by their {@link Object#hashCode()}. */
+public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> {
+
+  /** Use {@link Object#hashCode()} to partition. */
+  public int getBucket(K2 key, V2 value, int numBuckets) {
+    return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
+  }
+
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Fri Feb 19 00:58:28 2010
@@ -50,7 +50,7 @@
  * registerOutputFormatSubstitute(Class, Class) getOutputFormatSubstitute(Class)
  * are added for backward compatibility. They return the newly added
  * HiveOutputFormat for the older ones.
- * 
+ *
  */
 public final class HiveFileFormatUtils {
 
@@ -69,7 +69,7 @@
 
   /**
    * register a substitute.
-   * 
+   *
    * @param origin
    *          the class that need to be substituted
    * @param substitute
@@ -97,17 +97,17 @@
 
   /**
    * get the final output path of a given FileOutputFormat.
-   * 
+   *
    * @param parent
    *          parent dir of the expected final output path
    * @param jc
    *          job configuration
    */
-  public static Path getOutputFormatFinalPath(Path parent, JobConf jc,
+  public static Path getOutputFormatFinalPath(Path parent, String taskId, JobConf jc,
       HiveOutputFormat<?, ?> hiveOutputFormat, boolean isCompressed,
       Path defaultFinalPath) throws IOException {
     if (hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) {
-      return new Path(parent, Utilities.getTaskId(jc)
+      return new Path(parent, taskId
           + Utilities.getFileExtension(jc, isCompressed));
     }
     return defaultFinalPath;
@@ -131,7 +131,7 @@
 
   /**
    * register an InputFormatChecker for a given InputFormat.
-   * 
+   *
    * @param format
    *          the class that need to be substituted
    * @param checker

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePartitioner.java Fri Feb 19 00:58:28 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.mapred.Partitioner;
+
+/** Partition keys by their {@link Object#hashCode()}. */
+public interface HivePartitioner<K2, V2> extends Partitioner<K2, V2> {
+
+  /**
+   * Get the final bucket within a partition.
+   */
+  public int getBucket(K2 key, V2 value, int numBuckets);
+}
+

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Feb 19 00:58:28 2010
@@ -300,7 +300,7 @@
   /**
    * DFS-scan the expressionTree to find all aggregation subtrees and put them
    * in aggregations.
-   * 
+   *
    * @param expressionTree
    * @param aggregations
    *          the key to the HashTable is the toStringTree() representation of
@@ -353,7 +353,7 @@
    * Goes though the tabref tree and finds the alias for the table. Once found,
    * it records the table name-> alias association in aliasToTabs. It also makes
    * an association from the alias to the table AST in parse info.
-   * 
+   *
    * @return the alias of the table
    */
   private String processTable(QB qb, ASTNode tabref) throws SemanticException {
@@ -469,7 +469,7 @@
   /**
    * Given the AST with TOK_JOIN as the root, get all the aliases for the tables
    * or subqueries in the join.
-   * 
+   *
    * @param qb
    * @param join
    * @throws SemanticException
@@ -505,7 +505,7 @@
    * Given the AST with TOK_LATERAL_VIEW as the root, get the alias for the
    * table or subquery in the lateral view and also make a mapping from the
    * alias to all the lateral view AST's.
-   * 
+   *
    * @param qb
    * @param lateralView
    * @return the alias for the table/subquery
@@ -541,7 +541,7 @@
 
   /**
    * Phase 1: (including, but not limited to):
-   * 
+   *
    * 1. Gets all the aliases for all the tables / subqueries and makes the
    * appropriate mapping in aliasToTabs, aliasToSubq 2. Gets the location of the
    * destination and names the clase "inclause" + i 3. Creates a map from a
@@ -549,7 +549,7 @@
    * 4. Creates a mapping from the clause name to the select expression AST in
    * destToSelExpr 5. Creates a mapping from a table alias to the lateral view
    * AST's in aliasToLateralViews
-   * 
+   *
    * @param ast
    * @param qb
    * @param ctx_1
@@ -996,7 +996,7 @@
    * condition involves both subtrees and is not a equality. Also, we only
    * support AND i.e ORs are not supported currently as their semantics are not
    * very clear, may lead to data explosion and there is no usecase.
-   * 
+   *
    * @param joinTree
    *          jointree to be populated
    * @param joinCond
@@ -1140,7 +1140,7 @@
 
   /**
    * create a filter plan. The condition and the inputs are specified.
-   * 
+   *
    * @param qb
    *          current query block
    * @param condn
@@ -1898,7 +1898,7 @@
 
   /**
    * Returns the GenericUDAFInfo struct for the aggregation.
-   * 
+   *
    * @param aggName
    *          The name of the UDAF.
    * @param aggParameters
@@ -1962,7 +1962,7 @@
   /**
    * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
    * The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
-   * 
+   *
    * @param mode
    *          The mode of the aggregation (PARTIAL1 or COMPLETE)
    * @param genericUDAFEvaluators
@@ -2062,7 +2062,7 @@
   /**
    * Generate the GroupByOperator for the Query Block (parseInfo.getXXX(dest)).
    * The new GroupByOperator will be a child of the reduceSinkOperatorInfo.
-   * 
+   *
    * @param mode
    *          The mode of the aggregation (MERGEPARTIAL, PARTIAL2)
    * @param genericUDAFEvaluators
@@ -2190,7 +2190,7 @@
    * Generate the map-side GroupByOperator for the Query Block
    * (qb.getParseInfo().getXXX(dest)). The new GroupByOperator will be a child
    * of the inputOperatorInfo.
-   * 
+   *
    * @param mode
    *          The mode of the aggregation (HASH)
    * @param genericUDAFEvaluators
@@ -2303,10 +2303,10 @@
    * Generate the ReduceSinkOperator for the Group By Query Block
    * (qb.getPartInfo().getXXX(dest)). The new ReduceSinkOperator will be a child
    * of inputOperatorInfo.
-   * 
+   *
    * It will put all Group By keys and the distinct field (if any) in the
    * map-reduce sort key, and all other fields in the map-reduce value.
-   * 
+   *
    * @param numPartitionFields
    *          the number of fields for map-reduce partitioning. This is usually
    *          the number of fields in the Group By keys.
@@ -2425,10 +2425,10 @@
    * Generate the second ReduceSinkOperator for the Group By Plan
    * (parseInfo.getXXX(dest)). The new ReduceSinkOperator will be a child of
    * groupByOperatorInfo.
-   * 
+   *
    * The second ReduceSinkOperator will put the group by keys in the map-reduce
    * sort key, and put the partial aggregation results in the map-reduce value.
-   * 
+   *
    * @param numPartitionFields
    *          the number of fields in the map-reduce partition key. This should
    *          always be the same as the number of Group By keys. We should be
@@ -2498,7 +2498,7 @@
    * Generate the second GroupByOperator for the Group By Plan
    * (parseInfo.getXXX(dest)). The new GroupByOperator will do the second
    * aggregation based on the partial aggregation results.
-   * 
+   *
    * @param mode
    *          the mode of aggregation (FINAL)
    * @param genericUDAFEvaluators
@@ -2587,26 +2587,26 @@
   /**
    * Generate a Group-By plan using a single map-reduce job (3 operators will be
    * inserted):
-   * 
+   *
    * ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP,
    * A2_EXP) ) SortGroupBy (keys = (KEY.0,KEY.1), aggregations =
    * (count_distinct(KEY.2), sum(VALUE.0), count(VALUE.1))) Select (final
    * selects).
-   * 
+   *
    * @param dest
    * @param qb
    * @param input
    * @return
    * @throws SemanticException
-   * 
+   *
    *           Generate a Group-By plan using 1 map-reduce job. Spray by the
    *           group by key, and sort by the distinct key (if any), and compute
    *           aggregates * The agggregation evaluation functions are as
    *           follows: Partitioning Key: grouping key
-   * 
+   *
    *           Sorting Key: grouping key if no DISTINCT grouping + distinct key
    *           if DISTINCT
-   * 
+   *
    *           Reducer: iterate/merge (mode = COMPLETE)
    **/
   @SuppressWarnings({"unused", "nls"})
@@ -2643,30 +2643,30 @@
 
   /**
    * Generate a Multi Group-By plan using a 2 map-reduce jobs.
-   * 
+   *
    * @param dest
    * @param qb
    * @param input
    * @return
    * @throws SemanticException
-   * 
+   *
    *           Generate a Group-By plan using a 2 map-reduce jobs. Spray by the
    *           distinct key in hope of getting a uniform distribution, and
    *           compute partial aggregates by the grouping key. Evaluate partial
    *           aggregates first, and spray by the grouping key to compute actual
    *           aggregates in the second phase. The agggregation evaluation
    *           functions are as follows: Partitioning Key: distinct key
-   * 
+   *
    *           Sorting Key: distinct key
-   * 
+   *
    *           Reducer: iterate/terminatePartial (mode = PARTIAL1)
-   * 
+   *
    *           STAGE 2
-   * 
+   *
    *           Partitioning Key: grouping key
-   * 
+   *
    *           Sorting Key: grouping key
-   * 
+   *
    *           Reducer: merge/terminate (mode = FINAL)
    */
   @SuppressWarnings("nls")
@@ -2701,20 +2701,20 @@
   /**
    * Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be
    * inserted):
-   * 
+   *
    * ReduceSink ( keys = (K1_EXP, K2_EXP, DISTINCT_EXP), values = (A1_EXP,
    * A2_EXP) ) NOTE: If DISTINCT_EXP is null, partition by rand() SortGroupBy
    * (keys = (KEY.0,KEY.1), aggregations = (count_distinct(KEY.2), sum(VALUE.0),
    * count(VALUE.1))) ReduceSink ( keys = (0,1), values=(2,3,4)) SortGroupBy
    * (keys = (KEY.0,KEY.1), aggregations = (sum(VALUE.0), sum(VALUE.1),
    * sum(VALUE.2))) Select (final selects).
-   * 
+   *
    * @param dest
    * @param qb
    * @param input
    * @return
    * @throws SemanticException
-   * 
+   *
    *           Generate a Group-By plan using a 2 map-reduce jobs. Spray by the
    *           grouping key and distinct key (or a random number, if no distinct
    *           is present) in hope of getting a uniform distribution, and
@@ -2724,19 +2724,19 @@
    *           phase. The agggregation evaluation functions are as follows:
    *           Partitioning Key: random() if no DISTINCT grouping + distinct key
    *           if DISTINCT
-   * 
+   *
    *           Sorting Key: grouping key if no DISTINCT grouping + distinct key
    *           if DISTINCT
-   * 
+   *
    *           Reducer: iterate/terminatePartial (mode = PARTIAL1)
-   * 
+   *
    *           STAGE 2
-   * 
+   *
    *           Partitioning Key: grouping key
-   * 
+   *
    *           Sorting Key: grouping key if no DISTINCT grouping + distinct key
    *           if DISTINCT
-   * 
+   *
    *           Reducer: merge/terminate (mode = FINAL)
    */
   @SuppressWarnings("nls")
@@ -2800,15 +2800,15 @@
    * we may turn off map-side partial aggregation based on its performance. Then
    * spray by the group by key, and sort by the distinct key (if any), and
    * compute aggregates based on actual aggregates
-   * 
+   *
    * The agggregation evaluation functions are as follows: Mapper:
    * iterate/terminatePartial (mode = HASH)
-   * 
+   *
    * Partitioning Key: grouping key
-   * 
+   *
    * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
    * DISTINCT
-   * 
+   *
    * Reducer: iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode
    * = MERGEPARTIAL)
    */
@@ -2864,23 +2864,23 @@
    * key). Evaluate partial aggregates first, and spray by the grouping key to
    * compute actual aggregates in the second phase. The agggregation evaluation
    * functions are as follows: Mapper: iterate/terminatePartial (mode = HASH)
-   * 
+   *
    * Partitioning Key: random() if no DISTINCT grouping + distinct key if
    * DISTINCT
-   * 
+   *
    * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
    * DISTINCT
-   * 
+   *
    * Reducer: iterate/terminatePartial if DISTINCT merge/terminatePartial if NO
    * DISTINCT (mode = MERGEPARTIAL)
-   * 
+   *
    * STAGE 2
-   * 
+   *
    * Partitioining Key: grouping key
-   * 
+   *
    * Sorting Key: grouping key if no DISTINCT grouping + distinct key if
    * DISTINCT
-   * 
+   *
    * Reducer: merge/terminate (mode = FINAL)
    */
   @SuppressWarnings("nls")
@@ -2890,7 +2890,7 @@
     QBParseInfo parseInfo = qb.getParseInfo();
 
     // ////// Generate GroupbyOperator for a map-side partial aggregation
-    Map<String, GenericUDAFEvaluator> genericUDAFEvaluators = 
+    Map<String, GenericUDAFEvaluator> genericUDAFEvaluators =
       new LinkedHashMap<String, GenericUDAFEvaluator>();
     GroupByOperator groupByOperatorInfo = (GroupByOperator) genGroupByPlanMapGroupByOperator(
         qb, dest, inputOperatorInfo, GroupByDesc.Mode.HASH,
@@ -2961,6 +2961,15 @@
     return input;
   }
 
+  private int getReducersBucketing(int totalFiles, int maxReducers) {
+    int numFiles = totalFiles/maxReducers;
+    while (true) {
+      if (totalFiles%numFiles == 0)
+        return totalFiles/numFiles;
+      numFiles++;
+    }
+  }
+
   @SuppressWarnings("nls")
   private Operator genFileSinkPlan(String dest, QB qb, Operator input)
       throws SemanticException {
@@ -2975,11 +2984,47 @@
     TableDesc table_desc = null;
     int currentTableId = 0;
     boolean isLocal = false;
+    boolean multiFileSpray = false;
+    int numFiles = 1;
+    int totalFiles = 1;
+    ArrayList<ExprNodeDesc> partnCols = null;
 
     switch (dest_type.intValue()) {
     case QBMetaData.DEST_TABLE: {
 
       dest_tab = qbm.getDestTableForAlias(dest);
+
+      // If the table is bucketed, and bucketing is enforced, do the following:
+      // If the number of buckets is smaller than the number of maximum reducers,
+      // create those many reducers.
+      // If not, create a multiFileSink instead of FileSink - the multiFileSink will
+      // spray the data into multiple buckets. That way, we can support a very large
+      // number of buckets without needing a very large number of reducers.
+
+      if ((dest_tab.getNumBuckets() > 0) &&
+          (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+        int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+        int numBuckets  = dest_tab.getNumBuckets();
+        if (numBuckets > maxReducers) {
+          multiFileSpray = true;
+          totalFiles = numBuckets;
+          if (totalFiles % maxReducers == 0) {
+            numFiles = totalFiles / maxReducers;
+          }
+          else {
+            // find the number of reducers such that it is a divisor of totalFiles
+            maxReducers = getReducersBucketing(totalFiles, maxReducers);
+            numFiles = totalFiles/maxReducers;
+          }
+        }
+        else {
+          maxReducers = numBuckets;
+        }
+
+        partnCols = getParitionColsFromBucketCols(dest_tab, input);
+        input = genReduceSinkPlanForBucketing(dest_tab, input, partnCols, maxReducers);
+      }
+
       // check for partition
       List<FieldSchema> parts = dest_tab.getPartitionKeys();
       if (parts != null && parts.size() > 0) {
@@ -3007,6 +3052,31 @@
 
       Partition dest_part = qbm.getDestPartitionForAlias(dest);
       dest_tab = dest_part.getTable();
+
+      if ((dest_tab.getNumBuckets() > 0) &&
+          (conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
+        int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+        int numBuckets  = dest_tab.getNumBuckets();
+        if (numBuckets > maxReducers) {
+          multiFileSpray = true;
+          totalFiles = numBuckets;
+          if (totalFiles % maxReducers == 0) {
+            numFiles = totalFiles / maxReducers;
+          }
+          else {
+            // find the number of reducers such that it is a divisor of totalFiles
+            maxReducers = getReducersBucketing(totalFiles, maxReducers);
+            numFiles = totalFiles/maxReducers;
+          }
+        }
+        else {
+          maxReducers = numBuckets;
+        }
+
+        partnCols = getParitionColsFromBucketCols(dest_tab, input);
+        input = genReduceSinkPlanForBucketing(dest_tab, input, partnCols, maxReducers);
+      }
+
       dest_path = dest_part.getPath()[0];
       queryTmpdir = ctx.getExternalTmpFileURI(dest_path.toUri());
       table_desc = Utilities.getTableDesc(dest_tab);
@@ -3161,7 +3231,8 @@
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         new FileSinkDesc(queryTmpdir, table_desc, conf
-        .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId),
+        .getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId,
+        multiFileSpray, numFiles, totalFiles, partnCols),
         fsRS, input), inputRR);
 
     LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
@@ -3392,6 +3463,82 @@
     return genLimitPlan(dest, qb, curr, limit);
   }
 
+  private ArrayList<ExprNodeDesc> getParitionColsFromBucketCols(Table tab, Operator input) {
+    RowResolver inputRR = opParseCtx.get(input).getRR();
+    List<String> tabBucketCols = tab.getBucketCols();
+    List<FieldSchema> tabCols  = tab.getCols();
+
+    // Partition by the bucketing column
+    ArrayList<ExprNodeDesc> partitionCols = new ArrayList<ExprNodeDesc>();
+    for (String bucketCol : tabBucketCols) {
+      int pos = 0;
+      for (FieldSchema tabCol : tabCols) {
+        if (bucketCol.equals(tabCol.getName())) {
+          ColumnInfo colInfo = inputRR.getColumnInfos().get(pos);
+          partitionCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
+                                                   .getInternalName(), colInfo.getTabAlias(), colInfo
+                                                   .getIsPartitionCol()));
+
+          break;
+        }
+        pos++;
+      }
+    }
+    return partitionCols;
+  }
+
+  @SuppressWarnings("nls")
+  private Operator genReduceSinkPlanForBucketing(Table tab, Operator input, ArrayList<ExprNodeDesc> partitionCols,
+      int numReducers)
+    throws SemanticException {
+    RowResolver inputRR = opParseCtx.get(input).getRR();
+
+    ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+
+    // For the generation of the values expression just get the inputs
+    // signature and generate field expressions for those
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+    for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
+      valueCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
+          .getInternalName(), colInfo.getTabAlias(), colInfo
+          .getIsPartitionCol()));
+      colExprMap.put(colInfo.getInternalName(), valueCols
+          .get(valueCols.size() - 1));
+    }
+
+    ArrayList<String> outputColumns = new ArrayList<String>();
+    for (int i = 0; i < valueCols.size(); i++) {
+      outputColumns.add(getColumnInternalName(i));
+    }
+    Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+        .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
+                           partitionCols, new String(), numReducers),
+        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+    interim.setColumnExprMap(colExprMap);
+
+    // Add the extract operator to get the value fields
+    RowResolver out_rwsch = new RowResolver();
+    RowResolver interim_rwsch = inputRR;
+    Integer pos = Integer.valueOf(0);
+    for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
+      String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
+      out_rwsch.put(info[0], info[1], new ColumnInfo(
+          getColumnInternalName(pos), colInfo.getType(), info[0], false));
+      pos = Integer.valueOf(pos.intValue() + 1);
+    }
+
+    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+        Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
+        out_rwsch.getColumnInfos()), interim), out_rwsch);
+
+    LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() + " row schema: "
+        + out_rwsch.toString());
+    return output;
+
+  }
+
   @SuppressWarnings("nls")
   private Operator genReduceSinkPlan(String dest, QB qb, Operator input,
       int numReducers) throws SemanticException {
@@ -3705,7 +3852,7 @@
   /**
    * Construct a selection operator for semijoin that filter out all fields
    * other than the group by keys.
-   * 
+   *
    * @param fields
    *          list of fields need to be output
    * @param input
@@ -4735,9 +4882,9 @@
    * expressions are provided on the TABLESAMPLE clause and the table has
    * clustering columns defined in it's metadata. The predicate created has the
    * following structure:
-   * 
+   *
    * ((hash(expressions) & Integer.MAX_VALUE) % denominator) == numerator
-   * 
+   *
    * @param ts
    *          TABLESAMPLE clause information
    * @param bucketCols
@@ -5068,7 +5215,7 @@
   /**
    * Generates the operator DAG needed to implement lateral views and attaches
    * it to the TS operator.
-   * 
+   *
    * @param aliasToOpInfo
    *          A mapping from a table alias to the TS operator. This function
    *          replaces the operator mapping as necessary
@@ -5137,11 +5284,11 @@
    * A helper function that gets all the columns and respective aliases in the
    * source and puts them into dest. It renames the internal names of the
    * columns based on getColumnInternalName(position).
-   * 
+   *
    * Note that this helper method relies on RowResolver.getColumnInfos()
    * returning the columns in the same order as they will be passed in the
    * operator DAG.
-   * 
+   *
    * @param source
    * @param dest
    * @param outputColNames
@@ -5703,7 +5850,7 @@
    * Generates an expression node descriptor for the expression passed in the
    * arguments. This function uses the row resolver and the metadata information
    * that are passed as arguments to resolve the column names to internal names.
-   * 
+   *
    * @param expr
    *          The expression
    * @param input
@@ -5810,7 +5957,7 @@
    * throws and exception in case the same column name is present in multiple
    * table. The exception message indicates that the ambiguity could not be
    * resolved.
-   * 
+   *
    * @param qbm
    *          The metadata where the function looks for the table alias
    * @param colName

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Fri Feb 19 00:58:28 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 
 /**
  * FileSinkDesc.
@@ -33,17 +34,26 @@
   private int destTableId;
   private String compressCodec;
   private String compressType;
+  private boolean multiFileSpray;
+  private int     totalFiles;
+  private ArrayList<ExprNodeDesc> partitionCols;
+  private int     numFiles;
 
   public FileSinkDesc() {
   }
 
   public FileSinkDesc(final String dirName, final TableDesc tableInfo,
-      final boolean compressed, int destTableId) {
+      final boolean compressed, final int destTableId, final boolean multiFileSpray,
+      final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
     this.compressed = compressed;
     this.destTableId = destTableId;
+    this.multiFileSpray = multiFileSpray;
+    this.numFiles = numFiles;
+    this.totalFiles = totalFiles;
+    this.partitionCols = partitionCols;
   }
 
   public FileSinkDesc(final String dirName, final TableDesc tableInfo,
@@ -53,6 +63,10 @@
     this.tableInfo = tableInfo;
     this.compressed = compressed;
     destTableId = 0;
+    this.multiFileSpray = false;
+    this.numFiles = 1;
+    this.totalFiles = 1;
+    this.partitionCols = null;
   }
 
   @Explain(displayName = "directory", normalExplain = false)
@@ -106,4 +120,63 @@
   public void setCompressType(String intermediateCompressType) {
     compressType = intermediateCompressType;
   }
+
+  /**
+   * @return the multiFileSpray
+   */
+  @Explain(displayName = "MultiFileSpray", normalExplain = false)
+  public boolean isMultiFileSpray() {
+    return multiFileSpray;
+  }
+
+  /**
+   * @param multiFileSpray the multiFileSpray to set
+   */
+  public void setMultiFileSpray(boolean multiFileSpray) {
+    this.multiFileSpray = multiFileSpray;
+  }
+
+  /**
+   * @return the totalFiles
+   */
+  @Explain(displayName = "TotalFiles", normalExplain = false)
+  public int getTotalFiles() {
+    return totalFiles;
+  }
+
+  /**
+   * @param totalFiles the totalFiles to set
+   */
+  public void setTotalFiles(int totalFiles) {
+    this.totalFiles = totalFiles;
+  }
+
+  /**
+   * @return the partitionCols
+   */
+  public ArrayList<ExprNodeDesc> getPartitionCols() {
+    return partitionCols;
+  }
+
+  /**
+   * @param partitionCols the partitionCols to set
+   */
+  public void setPartitionCols(ArrayList<ExprNodeDesc> partitionCols) {
+    this.partitionCols = partitionCols;
+  }
+
+  /**
+   * @return the numFiles
+   */
+  @Explain(displayName = "NumFilesPerFileSink", normalExplain = false)
+  public int getNumFiles() {
+    return numFiles;
+  }
+
+  /**
+   * @param numFiles the numFiles to set
+   */
+  public void setNumFiles(int numFiles) {
+    this.numFiles = numFiles;
+  }
 }

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket1.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,16 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 200;
+
+drop table bucket1_1;
+CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS;
+
+explain extended
+insert overwrite table bucket1_1
+select * from src;
+
+insert overwrite table bucket1_1
+select * from src;
+
+select * from bucket1_1 order by key;
+
+drop table bucket1_1;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket2.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,19 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+
+drop table bucket2_1;
+CREATE TABLE bucket2_1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS;
+
+explain extended
+insert overwrite table bucket2_1
+select * from src;
+
+insert overwrite table bucket2_1
+select * from src;
+
+explain
+select * from bucket2_1 tablesample (bucket 1 out of 2) s order by key;
+
+select * from bucket2_1 tablesample (bucket 1 out of 2) s order by key;
+
+drop table bucket2_1;

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/bucket3.q Fri Feb 19 00:58:28 2010
@@ -0,0 +1,22 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+
+drop table bucket3_1;
+CREATE TABLE bucket3_1(key int, value string) partitioned by (ds string) CLUSTERED BY (key) INTO 2 BUCKETS;
+
+explain extended
+insert overwrite table bucket3_1 partition (ds='1')
+select * from src;
+
+insert overwrite table bucket3_1 partition (ds='1')
+select * from src;
+
+insert overwrite table bucket3_1 partition (ds='2')
+select * from src;
+
+explain
+select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' order by key;
+
+select * from bucket3_1 tablesample (bucket 1 out of 2) s where ds = '1' order by key;
+
+drop table bucket3_1;

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out?rev=911664&r1=911663&r2=911664&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/binary_output_format.q.out Fri Feb 19 00:58:28 2010
@@ -92,7 +92,8 @@
                 File Output Operator
                   compressed: false
                   GlobalTableId: 1
-                  directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
+                  directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
+                  NumFilesPerFileSink: 1
                   table:
                       input format: org.apache.hadoop.mapred.TextInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -102,20 +103,22 @@
                         columns.types string
                         file.inputformat org.apache.hadoop.mapred.TextInputFormat
                         file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
-                        location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+                        location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
                         name dest1
                         serialization.ddl struct dest1 { string mydata}
                         serialization.format 1
                         serialization.last.column.takes.rest true
                         serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        transient_lastDdlTime 1266041758
+                        transient_lastDdlTime 1266449090
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                       name: dest1
+                  TotalFiles: 1
+                  MultiFileSpray: false
       Needs Tagging: false
       Path -> Alias:
-        file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src [src]
+        file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src [src]
       Path -> Partition:
-        file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src 
+        file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src 
           Partition
             base file name: src
             input format: org.apache.hadoop.mapred.TextInputFormat
@@ -126,12 +129,12 @@
               columns.types string:string
               file.inputformat org.apache.hadoop.mapred.TextInputFormat
               file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-              location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src
+              location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
               name src
               serialization.ddl struct src { string key, string value}
               serialization.format 1
               serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              transient_lastDdlTime 1266041757
+              transient_lastDdlTime 1266449089
             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
           
               input format: org.apache.hadoop.mapred.TextInputFormat
@@ -142,12 +145,12 @@
                 columns.types string:string
                 file.inputformat org.apache.hadoop.mapred.TextInputFormat
                 file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/src
+                location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
                 name src
                 serialization.ddl struct src { string key, string value}
                 serialization.format 1
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                transient_lastDdlTime 1266041757
+                transient_lastDdlTime 1266449089
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: src
             name: src
@@ -159,14 +162,14 @@
     Move Operator
       files:
           hdfs directory: true
-          source: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002
-          destination: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+          source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002
+          destination: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
 
   Stage: Stage-0
     Move Operator
       tables:
           replace: true
-          source: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+          source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
           table:
               input format: org.apache.hadoop.mapred.TextInputFormat
               output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -176,21 +179,21 @@
                 columns.types string
                 file.inputformat org.apache.hadoop.mapred.TextInputFormat
                 file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
-                location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+                location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
                 name dest1
                 serialization.ddl struct dest1 { string mydata}
                 serialization.format 1
                 serialization.last.column.takes.rest true
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                transient_lastDdlTime 1266041758
+                transient_lastDdlTime 1266449090
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: dest1
-          tmp directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10001
+          tmp directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10001
 
   Stage: Stage-2
     Map Reduce
       Alias -> Map Operator Tree:
-        file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002 
+        file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002 
             Reduce Output Operator
               sort order: 
               Map-reduce partition columns:
@@ -202,9 +205,9 @@
                     type: string
       Needs Tagging: false
       Path -> Alias:
-        file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002 [file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002]
+        file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002 [file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002]
       Path -> Partition:
-        file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10002 
+        file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10002 
           Partition
             base file name: 10002
             input format: org.apache.hadoop.mapred.TextInputFormat
@@ -215,13 +218,13 @@
               columns.types string
               file.inputformat org.apache.hadoop.mapred.TextInputFormat
               file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
-              location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+              location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
               name dest1
               serialization.ddl struct dest1 { string mydata}
               serialization.format 1
               serialization.last.column.takes.rest true
               serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              transient_lastDdlTime 1266041758
+              transient_lastDdlTime 1266449090
             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
           
               input format: org.apache.hadoop.mapred.TextInputFormat
@@ -232,13 +235,13 @@
                 columns.types string
                 file.inputformat org.apache.hadoop.mapred.TextInputFormat
                 file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
-                location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+                location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
                 name dest1
                 serialization.ddl struct dest1 { string mydata}
                 serialization.format 1
                 serialization.last.column.takes.rest true
                 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                transient_lastDdlTime 1266041758
+                transient_lastDdlTime 1266449090
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: dest1
             name: dest1
@@ -247,7 +250,8 @@
           File Output Operator
             compressed: false
             GlobalTableId: 0
-            directory: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-15-58_271_3464965796195557656/10000
+            directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-50_051_1929056605300008322/10000
+            NumFilesPerFileSink: 1
             table:
                 input format: org.apache.hadoop.mapred.TextInputFormat
                 output format: org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
@@ -257,15 +261,17 @@
                   columns.types string
                   file.inputformat org.apache.hadoop.mapred.TextInputFormat
                   file.outputformat org.apache.hadoop.hive.ql.io.HiveBinaryOutputFormat
-                  location file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/test/data/warehouse/dest1
+                  location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/dest1
                   name dest1
                   serialization.ddl struct dest1 { string mydata}
                   serialization.format 1
                   serialization.last.column.takes.rest true
                   serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                  transient_lastDdlTime 1266041758
+                  transient_lastDdlTime 1266449090
                 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                 name: dest1
+            TotalFiles: 1
+            MultiFileSpray: false
 
 
 PREHOOK: query: INSERT OVERWRITE TABLE dest1
@@ -300,12 +306,12 @@
 SELECT * FROM dest1
 PREHOOK: type: QUERY
 PREHOOK: Input: default@dest1
-PREHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-16-03_636_3562459403060191123/10000
+PREHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-54_510_6167292116608425875/10000
 POSTHOOK: query: -- Test the result
 SELECT * FROM dest1
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dest1
-POSTHOOK: Output: file:/data/users/zshao/hadoop_hive_trunk2/.ptest_1/build/ql/scratchdir/hive_2010-02-12_22-16-03_636_3562459403060191123/10000
+POSTHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-24-54_510_6167292116608425875/10000
 238	val_238
 86	val_86
 311	val_311

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out?rev=911664&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/bucket1.q.out Fri Feb 19 00:58:28 2010
@@ -0,0 +1,673 @@
+PREHOOK: query: drop table bucket1_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket1_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE bucket1_1(key int, value string) CLUSTERED BY (key) INTO 100 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket1_1
+PREHOOK: query: explain extended
+insert overwrite table bucket1_1
+select * from src
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table bucket1_1
+select * from src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB bucket1_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+              outputColumnNames: _col0, _col1
+              Reduce Output Operator
+                sort order: 
+                Map-reduce partition columns:
+                      expr: _col0
+                      type: string
+                tag: -1
+                value expressions:
+                      expr: _col0
+                      type: string
+                      expr: _col1
+                      type: string
+      Needs Tagging: false
+      Path -> Alias:
+        file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src [src]
+      Path -> Partition:
+        file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src 
+          Partition
+            base file name: src
+            input format: org.apache.hadoop.mapred.TextInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+            properties:
+              bucket_count -1
+              columns key,value
+              columns.types string:string
+              file.inputformat org.apache.hadoop.mapred.TextInputFormat
+              file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
+              name src
+              serialization.ddl struct src { string key, string value}
+              serialization.format 1
+              serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              transient_lastDdlTime 1266535134
+            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                bucket_count -1
+                columns key,value
+                columns.types string:string
+                file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/src
+                name src
+                serialization.ddl struct src { string key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                transient_lastDdlTime 1266535134
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: src
+            name: src
+      Reduce Operator Tree:
+        Extract
+          Select Operator
+            expressions:
+                  expr: UDFToInteger(_col0)
+                  type: int
+                  expr: _col1
+                  type: string
+            outputColumnNames: _col0, _col1
+            File Output Operator
+              compressed: false
+              GlobalTableId: 1
+              directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10000
+              NumFilesPerFileSink: 1
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    bucket_count 100
+                    bucket_field_name key
+                    columns key,value
+                    columns.types int:string
+                    file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                    file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/bucket1_1
+                    name bucket1_1
+                    serialization.ddl struct bucket1_1 { i32 key, string value}
+                    serialization.format 1
+                    serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    transient_lastDdlTime 1266535134
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: bucket1_1
+              TotalFiles: 1
+              MultiFileSpray: false
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          source: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10000
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                bucket_count 100
+                bucket_field_name key
+                columns key,value
+                columns.types int:string
+                file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                location file:/data/users/njain/hive1/hive1/build/ql/test/data/warehouse/bucket1_1
+                name bucket1_1
+                serialization.ddl struct bucket1_1 { i32 key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                transient_lastDdlTime 1266535134
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: bucket1_1
+          tmp directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-54_872_8904445139398831015/10001
+
+
+PREHOOK: query: insert overwrite table bucket1_1
+select * from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@bucket1_1
+POSTHOOK: query: insert overwrite table bucket1_1
+select * from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@bucket1_1
+PREHOOK: query: select * from bucket1_1 order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket1_1
+PREHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-59_053_2682513110201565086/10000
+POSTHOOK: query: select * from bucket1_1 order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket1_1
+POSTHOOK: Output: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-18_15-18-59_053_2682513110201565086/10000
+0	val_0
+0	val_0
+0	val_0
+2	val_2
+4	val_4
+5	val_5
+5	val_5
+5	val_5
+8	val_8
+9	val_9
+10	val_10
+11	val_11
+12	val_12
+12	val_12
+15	val_15
+15	val_15
+17	val_17
+18	val_18
+18	val_18
+19	val_19
+20	val_20
+24	val_24
+24	val_24
+26	val_26
+26	val_26
+27	val_27
+28	val_28
+30	val_30
+33	val_33
+34	val_34
+35	val_35
+35	val_35
+35	val_35
+37	val_37
+37	val_37
+41	val_41
+42	val_42
+42	val_42
+43	val_43
+44	val_44
+47	val_47
+51	val_51
+51	val_51
+53	val_53
+54	val_54
+57	val_57
+58	val_58
+58	val_58
+64	val_64
+65	val_65
+66	val_66
+67	val_67
+67	val_67
+69	val_69
+70	val_70
+70	val_70
+70	val_70
+72	val_72
+72	val_72
+74	val_74
+76	val_76
+76	val_76
+77	val_77
+78	val_78
+80	val_80
+82	val_82
+83	val_83
+83	val_83
+84	val_84
+84	val_84
+85	val_85
+86	val_86
+87	val_87
+90	val_90
+90	val_90
+90	val_90
+92	val_92
+95	val_95
+95	val_95
+96	val_96
+97	val_97
+97	val_97
+98	val_98
+98	val_98
+100	val_100
+100	val_100
+103	val_103
+103	val_103
+104	val_104
+104	val_104
+105	val_105
+111	val_111
+113	val_113
+113	val_113
+114	val_114
+116	val_116
+118	val_118
+118	val_118
+119	val_119
+119	val_119
+119	val_119
+120	val_120
+120	val_120
+125	val_125
+125	val_125
+126	val_126
+128	val_128
+128	val_128
+128	val_128
+129	val_129
+129	val_129
+131	val_131
+133	val_133
+134	val_134
+134	val_134
+136	val_136
+137	val_137
+137	val_137
+138	val_138
+138	val_138
+138	val_138
+138	val_138
+143	val_143
+145	val_145
+146	val_146
+146	val_146
+149	val_149
+149	val_149
+150	val_150
+152	val_152
+152	val_152
+153	val_153
+155	val_155
+156	val_156
+157	val_157
+158	val_158
+160	val_160
+162	val_162
+163	val_163
+164	val_164
+164	val_164
+165	val_165
+165	val_165
+166	val_166
+167	val_167
+167	val_167
+167	val_167
+168	val_168
+169	val_169
+169	val_169
+169	val_169
+169	val_169
+170	val_170
+172	val_172
+172	val_172
+174	val_174
+174	val_174
+175	val_175
+175	val_175
+176	val_176
+176	val_176
+177	val_177
+178	val_178
+179	val_179
+179	val_179
+180	val_180
+181	val_181
+183	val_183
+186	val_186
+187	val_187
+187	val_187
+187	val_187
+189	val_189
+190	val_190
+191	val_191
+191	val_191
+192	val_192
+193	val_193
+193	val_193
+193	val_193
+194	val_194
+195	val_195
+195	val_195
+196	val_196
+197	val_197
+197	val_197
+199	val_199
+199	val_199
+199	val_199
+200	val_200
+200	val_200
+201	val_201
+202	val_202
+203	val_203
+203	val_203
+205	val_205
+205	val_205
+207	val_207
+207	val_207
+208	val_208
+208	val_208
+208	val_208
+209	val_209
+209	val_209
+213	val_213
+213	val_213
+214	val_214
+216	val_216
+216	val_216
+217	val_217
+217	val_217
+218	val_218
+219	val_219
+219	val_219
+221	val_221
+221	val_221
+222	val_222
+223	val_223
+223	val_223
+224	val_224
+224	val_224
+226	val_226
+228	val_228
+229	val_229
+229	val_229
+230	val_230
+230	val_230
+230	val_230
+230	val_230
+230	val_230
+233	val_233
+233	val_233
+235	val_235
+237	val_237
+237	val_237
+238	val_238
+238	val_238
+239	val_239
+239	val_239
+241	val_241
+242	val_242
+242	val_242
+244	val_244
+247	val_247
+248	val_248
+249	val_249
+252	val_252
+255	val_255
+255	val_255
+256	val_256
+256	val_256
+257	val_257
+258	val_258
+260	val_260
+262	val_262
+263	val_263
+265	val_265
+265	val_265
+266	val_266
+272	val_272
+272	val_272
+273	val_273
+273	val_273
+273	val_273
+274	val_274
+275	val_275
+277	val_277
+277	val_277
+277	val_277
+277	val_277
+278	val_278
+278	val_278
+280	val_280
+280	val_280
+281	val_281
+281	val_281
+282	val_282
+282	val_282
+283	val_283
+284	val_284
+285	val_285
+286	val_286
+287	val_287
+288	val_288
+288	val_288
+289	val_289
+291	val_291
+292	val_292
+296	val_296
+298	val_298
+298	val_298
+298	val_298
+302	val_302
+305	val_305
+306	val_306
+307	val_307
+307	val_307
+308	val_308
+309	val_309
+309	val_309
+310	val_310
+311	val_311
+311	val_311
+311	val_311
+315	val_315
+316	val_316
+316	val_316
+316	val_316
+317	val_317
+317	val_317
+318	val_318
+318	val_318
+318	val_318
+321	val_321
+321	val_321
+322	val_322
+322	val_322
+323	val_323
+325	val_325
+325	val_325
+327	val_327
+327	val_327
+327	val_327
+331	val_331
+331	val_331
+332	val_332
+333	val_333
+333	val_333
+335	val_335
+336	val_336
+338	val_338
+339	val_339
+341	val_341
+342	val_342
+342	val_342
+344	val_344
+344	val_344
+345	val_345
+348	val_348
+348	val_348
+348	val_348
+348	val_348
+348	val_348
+351	val_351
+353	val_353
+353	val_353
+356	val_356
+360	val_360
+362	val_362
+364	val_364
+365	val_365
+366	val_366
+367	val_367
+367	val_367
+368	val_368
+369	val_369
+369	val_369
+369	val_369
+373	val_373
+374	val_374
+375	val_375
+377	val_377
+378	val_378
+379	val_379
+382	val_382
+382	val_382
+384	val_384
+384	val_384
+384	val_384
+386	val_386
+389	val_389
+392	val_392
+393	val_393
+394	val_394
+395	val_395
+395	val_395
+396	val_396
+396	val_396
+396	val_396
+397	val_397
+397	val_397
+399	val_399
+399	val_399
+400	val_400
+401	val_401
+401	val_401
+401	val_401
+401	val_401
+401	val_401
+402	val_402
+403	val_403
+403	val_403
+403	val_403
+404	val_404
+404	val_404
+406	val_406
+406	val_406
+406	val_406
+406	val_406
+407	val_407
+409	val_409
+409	val_409
+409	val_409
+411	val_411
+413	val_413
+413	val_413
+414	val_414
+414	val_414
+417	val_417
+417	val_417
+417	val_417
+418	val_418
+419	val_419
+421	val_421
+424	val_424
+424	val_424
+427	val_427
+429	val_429
+429	val_429
+430	val_430
+430	val_430
+430	val_430
+431	val_431
+431	val_431
+431	val_431
+432	val_432
+435	val_435
+436	val_436
+437	val_437
+438	val_438
+438	val_438
+438	val_438
+439	val_439
+439	val_439
+443	val_443
+444	val_444
+446	val_446
+448	val_448
+449	val_449
+452	val_452
+453	val_453
+454	val_454
+454	val_454
+454	val_454
+455	val_455
+457	val_457
+458	val_458
+458	val_458
+459	val_459
+459	val_459
+460	val_460
+462	val_462
+462	val_462
+463	val_463
+463	val_463
+466	val_466
+466	val_466
+466	val_466
+467	val_467
+468	val_468
+468	val_468
+468	val_468
+468	val_468
+469	val_469
+469	val_469
+469	val_469
+469	val_469
+469	val_469
+470	val_470
+472	val_472
+475	val_475
+477	val_477
+478	val_478
+478	val_478
+479	val_479
+480	val_480
+480	val_480
+480	val_480
+481	val_481
+482	val_482
+483	val_483
+484	val_484
+485	val_485
+487	val_487
+489	val_489
+489	val_489
+489	val_489
+489	val_489
+490	val_490
+491	val_491
+492	val_492
+492	val_492
+493	val_493
+494	val_494
+495	val_495
+496	val_496
+497	val_497
+498	val_498
+498	val_498
+498	val_498
+PREHOOK: query: drop table bucket1_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket1_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@bucket1_1