You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/04/03 09:05:41 UTC

svn commit: r1463841 [1/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/metadata/ ql/src/java/org/a...

Author: namit
Date: Wed Apr  3 07:05:40 2013
New Revision: 1463841

URL: http://svn.apache.org/r1463841
Log:
HIVE-4240 optimize hive.enforce.bucketing and hive.enforce sorting insert
(Gang Tim Liu via namit)


Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q
    hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_18.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_19.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_20.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_21.q.out
    hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_22.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java Wed Apr  3 07:05:40 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.common;
 
+
+
 public class ObjectPair<F, S> {
   private F first;
   private S second;
@@ -44,4 +46,24 @@ public class ObjectPair<F, S> {
   public void setSecond(S second) {
     this.second = second;
   }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null) {
+      return false;
+    }
+    if (that instanceof ObjectPair) {
+      return this.equals((ObjectPair<F, S>)that);
+    }
+    return false;
+  }
+
+  public boolean equals(ObjectPair<F, S> that) {
+    if (that == null) {
+      return false;
+    }
+
+    return this.getFirst().equals(that.getFirst()) &&
+        this.getSecond().equals(that.getSecond());
+  }
 }

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr  3 07:05:40 2013
@@ -513,6 +513,7 @@ public class HiveConf extends Configurat
 
     HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
     HIVEENFORCESORTING("hive.enforce.sorting", false),
+    HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true),
     HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
     HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false),
     HIVEENFORCEBUCKETMAPJOIN("hive.enforce.bucketmapjoin", false),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Apr  3 07:05:40 2013
@@ -933,6 +933,16 @@
 </property>
 
 <property>
+  <name>hive.optimize.bucketingsorting</name>
+  <value>true</value>
+  <description>If hive.enforce.bucketing or hive.enforce.sorting is true, dont create a reducer for enforcing
+    bucketing/sorting for queries of the form: 
+    insert overwrite table T2 select * from T1;
+    where T1 and T2 are bucketed/sorted by the same keys into the same number of buckets.
+  </description>
+</property>
+
+<property>
   <name>hive.enforce.sortmergebucketmapjoin</name>
   <value>false</value>
   <description>If the user asked for sort-merge bucketed map-side join, and it cannot be performed,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Apr  3 07:05:40 2013
@@ -94,6 +94,19 @@ public class TableScanOperator extends O
   @Override
   public void cleanUpInputFileChangedOp() throws HiveException {
     inputFileChanged = true;
+    // If the file name to bucket number mapping is maintained, store the bucket number
+    // in the execution context. This is needed for the following scenario:
+    // insert overwrite table T1 select * from T2;
+    // where T1 and T2 are sorted/bucketed by the same keys into the same number of buckets
+    // Although one mapper per file is used (bucketizedinputhiveinput), it is possible that
+    // any mapper can pick up any file (depending on the size of the files). The bucket number
+    // corresponding to the input file is stored to name the output bucket file appropriately.
+    Map<String, Integer> bucketNameMapping = conf != null ? conf.getBucketFileNameMapping() : null;
+    if ((bucketNameMapping != null) && (!bucketNameMapping.isEmpty())) {
+      String currentInputFile = getExecContext().getCurrentInputFile();
+      getExecContext().setFileId(Integer.toString(bucketNameMapping.get(
+          Utilities.getFileNameFromDirName(currentInputFile))));
+    }
   }
 
   private void gatherStats(Object row) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Apr  3 07:05:40 2013
@@ -384,11 +384,10 @@ public class Partition implements Serial
   }
 
   /**
-   * mapping from bucket number to bucket path
+   * get all paths for this partition in a sorted manner
    */
-  // TODO: add test case and clean it up
   @SuppressWarnings("nls")
-  public Path getBucketPath(int bucketNum) {
+  public FileStatus[] getSortedPaths() {
     try {
       // Previously, this got the filesystem of the Table, which could be
       // different from the filesystem of the partition.
@@ -407,11 +406,23 @@ public class Partition implements Serial
       if (srcs.length == 0) {
         return null;
       }
-      return srcs[bucketNum].getPath();
+      return srcs;
     } catch (Exception e) {
-      throw new RuntimeException("Cannot get bucket path for bucket "
-          + bucketNum, e);
+      throw new RuntimeException("Cannot get path ", e);
+    }
+  }
+
+  /**
+   * mapping from bucket number to bucket path
+   */
+  // TODO: add test case and clean it up
+  @SuppressWarnings("nls")
+  public Path getBucketPath(int bucketNum) {
+    FileStatus srcs[] = getSortedPaths();
+    if (srcs == null) {
+      return null;
     }
+    return srcs[bucketNum].getPath();
   }
 
   @SuppressWarnings("nls")

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Apr  3 07:05:40 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
@@ -925,4 +927,30 @@ public class Table implements Serializab
     Hive hive = Hive.get();
     return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max);
   }
+
+  @SuppressWarnings("nls")
+  public FileStatus[] getSortedPaths() {
+    try {
+      // Previously, this got the filesystem of the Table, which could be
+      // different from the filesystem of the partition.
+      FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get()
+          .getConf());
+      String pathPattern = getPath().toString();
+      if (getNumBuckets() > 0) {
+        pathPattern = pathPattern + "/*";
+      }
+      LOG.info("Path pattern = " + pathPattern);
+      FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
+      Arrays.sort(srcs);
+      for (FileStatus src : srcs) {
+        LOG.info("Got file: " + src.getPath());
+      }
+      if (srcs.length == 0) {
+        return null;
+      }
+      return srcs;
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot get path ", e);
+    }
+  }
 };

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Wed Apr  3 07:05:40 2013
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * This transformation does optimization for enforcing bucketing and sorting.
+ * For a query of the form:
+ * insert overwrite table T1 select * from T2;
+ * where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
+ * enforce bucketing and sorting.
+ */
+public class BucketingSortingReduceSinkOptimizer implements Transform {
+
+  private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
+      .getName());
+
+  public BucketingSortingReduceSinkOptimizer() {
+  }
+
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    HiveConf conf = pctx.getConf();
+
+    // process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
+    opRules.put(new RuleRegExp("R1",
+        ReduceSinkOperator.getOperatorName() + "%" +
+            ExtractOperator.getOperatorName() + "%" +
+            FileSinkOperator.getOperatorName() + "%"),
+        getBucketSortReduceSinkProc(pctx));
+
+    // The dispatcher fires the processor corresponding to the closest matching rule
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pctx;
+  }
+
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+        return null;
+      }
+    };
+  }
+
+  private NodeProcessor getBucketSortReduceSinkProc(ParseContext pctx) {
+    return new BucketSortReduceSinkProcessor(pctx);
+  }
+
+  /**
+   * BucketSortReduceSinkProcessor.
+   *
+   */
+  public class BucketSortReduceSinkProcessor implements NodeProcessor {
+
+    protected ParseContext pGraphContext;
+
+    public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
+      this.pGraphContext = pGraphContext;
+    }
+
+    // Get the bucket positions for the table
+    private List<Integer> getBucketPositions(List<String> tabBucketCols,
+        List<FieldSchema> tabCols) {
+      List<Integer> posns = new ArrayList<Integer>();
+      for (String bucketCol : tabBucketCols) {
+        int pos = 0;
+        for (FieldSchema tabCol : tabCols) {
+          if (bucketCol.equals(tabCol.getName())) {
+            posns.add(pos);
+            break;
+          }
+          pos++;
+        }
+      }
+      return posns;
+    }
+
+    // Get the sort positions and sort order for the table
+    private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+        List<FieldSchema> tabCols) {
+      List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+      for (Order sortCol : tabSortCols) {
+        int pos = 0;
+        for (FieldSchema tabCol : tabCols) {
+          if (sortCol.getCol().equals(tabCol.getName())) {
+            posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+            break;
+          }
+          pos++;
+        }
+      }
+      return posns;
+    }
+
+    // Return true if the parition is bucketed/sorted by the specified positions
+    // The number of buckets, the sort order should also match along with the
+    // columns which are bucketed/sorted
+    private boolean checkPartition(Partition partition,
+        List<Integer> bucketPositionsDest,
+        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        int numBucketsDest) {
+      // The bucketing and sorting positions should exactly match
+      int numBuckets = partition.getBucketCount();
+      if (numBucketsDest != numBuckets) {
+        return false;
+      }
+
+      List<Integer> partnBucketPositions =
+          getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
+      List<ObjectPair<Integer, Integer>> partnSortPositions =
+          getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+      return bucketPositionsDest.equals(partnBucketPositions) &&
+          sortPositionsDest.equals(partnSortPositions);
+    }
+
+    // Return true if the table is bucketed/sorted by the specified positions
+    // The number of buckets, the sort order should also match along with the
+    // columns which are bucketed/sorted
+    private boolean checkTable(Table table,
+        List<Integer> bucketPositionsDest,
+        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        int numBucketsDest) {
+      // The bucketing and sorting positions should exactly match
+      int numBuckets = table.getNumBuckets();
+      if (numBucketsDest != numBuckets) {
+        return false;
+      }
+
+      List<Integer> tableBucketPositions =
+          getBucketPositions(table.getBucketCols(), table.getCols());
+      List<ObjectPair<Integer, Integer>> tableSortPositions =
+          getSortPositions(table.getSortCols(), table.getCols());
+      return bucketPositionsDest.equals(tableBucketPositions) &&
+          sortPositionsDest.equals(tableSortPositions);
+    }
+
+    private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
+      Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
+      for (int pos = 0; pos < srcs.length; pos++) {
+        bucketFileNameMapping.put(srcs[pos].getPath().getName(), pos);
+      }
+      tsOp.getConf().setBucketFileNameMapping(bucketFileNameMapping);
+    }
+
+    // Remove the reduceSinkOperator.
+    // The optimizer will automatically convert it to a map-only job.
+    private void removeReduceSink(ReduceSinkOperator rsOp,
+        TableScanOperator tsOp,
+        FileSinkOperator fsOp,
+        FileStatus[] srcs) {
+      if (srcs == null) {
+        return;
+      }
+
+      removeReduceSink(rsOp, tsOp, fsOp);
+      // Store the mapping -> path, bucket number
+      // This is needed since for the map-only job, any mapper can process any file.
+      // For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
+      // also output the file correspodning to bucket 2 of the output.
+      storeBucketPathMapping(tsOp, srcs);
+    }
+
+    // Remove the reduce sink operator
+    // Use bucketized hive input format so that one mapper processes exactly one file
+    private void removeReduceSink(ReduceSinkOperator rsOp,
+        TableScanOperator tsOp,
+        FileSinkOperator fsOp) {
+      Operator<? extends OperatorDesc> parRSOp = rsOp.getParentOperators().get(0);
+      parRSOp.getChildOperators().set(0, fsOp);
+      fsOp.getParentOperators().set(0, parRSOp);
+      fsOp.getConf().setMultiFileSpray(false);
+      fsOp.getConf().setTotalFiles(1);
+      fsOp.getConf().setNumFiles(1);
+      tsOp.setUseBucketizedHiveInputFormat(true);
+    }
+
+    private int findColumnPosition(List<FieldSchema> cols, String colName) {
+      int pos = 0;
+      for (FieldSchema col : cols) {
+        if (colName.equals(col.getName())) {
+          return pos;
+        }
+        pos++;
+      }
+      return -1;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      // If the reduce sink has not been introduced due to bucketing/sorting, ignore it
+      FileSinkOperator fsOp = (FileSinkOperator) nd;
+      ExtractOperator exOp = (ExtractOperator) fsOp.getParentOperators().get(0);
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) exOp.getParentOperators().get(0);
+
+      List<ReduceSinkOperator> rsOps = pGraphContext
+          .getReduceSinkOperatorsAddedByEnforceBucketingSorting();
+      // nothing to do
+      if ((rsOps != null) && (!rsOps.contains(rsOp))) {
+        return null;
+      }
+
+      // Support for dynamic partitions can be added later
+      if (fsOp.getConf().getDynPartCtx() != null) {
+        return null;
+      }
+
+      // No conversion is possible for the reduce keys
+      for (ExprNodeDesc keyCol : rsOp.getConf().getKeyCols()) {
+        if (!(keyCol instanceof ExprNodeColumnDesc)) {
+          return null;
+        }
+      }
+
+      Table destTable = pGraphContext.getFsopToTable().get(fsOp);
+      if (destTable == null) {
+        return null;
+      }
+
+      // Get the positions for sorted and bucketed columns
+      // For sorted columns, also get the order (ascending/descending) - that should
+      // also match for this to be converted to a map-only job.
+      List<Integer> bucketPositions =
+          getBucketPositions(destTable.getBucketCols(), destTable.getCols());
+      List<ObjectPair<Integer, Integer>> sortPositions =
+          getSortPositions(destTable.getSortCols(), destTable.getCols());
+
+      // Only selects and filters are allowed
+      Operator<? extends OperatorDesc> op = rsOp;
+      // TableScan will also be followed by a Select Operator. Find the expressions for the
+      // bucketed/sorted columns for the destination table
+      List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
+      List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+
+      while (true) {
+        if (op.getParentOperators().size() > 1) {
+          return null;
+        }
+
+        op = op.getParentOperators().get(0);
+        if (!(op instanceof TableScanOperator) &&
+            !(op instanceof FilterOperator) &&
+            !(op instanceof SelectOperator)) {
+          return null;
+        }
+
+        // nothing to be done for filters - the output schema does not change.
+        if (op instanceof TableScanOperator) {
+          Table srcTable = pGraphContext.getTopToTable().get(op);
+
+          // Find the positions of the bucketed columns in the table corresponding
+          // to the select list.
+          // Consider the following scenario:
+          // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+          // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+          // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+          // should be optimized.
+
+          // Start with the destination: T2, bucketed/sorted position is [1]
+          // At the source T1, the column corresponding to that position is [key], which
+          // maps to column [0] of T1, which is also bucketed/sorted into the same
+          // number of buckets
+          List<Integer> newBucketPositions = new ArrayList<Integer>();
+          for (int pos = 0; pos < bucketPositions.size(); pos++) {
+            ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+            String colName = col.getColumn();
+            int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+            if (bucketPos < 0) {
+              return null;
+            }
+            newBucketPositions.add(bucketPos);
+          }
+
+          // Find the positions/order of the sorted columns in the table corresponding
+          // to the select list.
+          List<ObjectPair<Integer, Integer>> newSortPositions =
+              new ArrayList<ObjectPair<Integer, Integer>>();
+          for (int pos = 0; pos < sortPositions.size(); pos++) {
+            ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+            String colName = col.getColumn();
+            int sortPos = findColumnPosition(srcTable.getCols(), colName);
+            if (sortPos < 0) {
+              return null;
+            }
+            newSortPositions.add(
+                new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+          }
+
+
+          if (srcTable.isPartitioned()) {
+            PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+            List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+            // Support for dynamic partitions can be added later
+            // The following is not optimized:
+            // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+            // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+            if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+              return null;
+            }
+            for (Partition partition : partitions) {
+              if (!checkPartition(partition, newBucketPositions, newSortPositions,
+                  pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+                return null;
+              }
+            }
+
+            removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+                partitions.get(0).getSortedPaths());
+            return null;
+          }
+          else {
+            if (!checkTable(srcTable, newBucketPositions, newSortPositions,
+                pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+              return null;
+            }
+
+            removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
+            return null;
+          }
+        }
+        // None of the operators is changing the positions
+        else if (op instanceof SelectOperator) {
+          SelectOperator selectOp = (SelectOperator) op;
+          SelectDesc selectDesc = selectOp.getConf();
+
+          // There may be multiple selects - chose the one closest to the table
+          sourceTableBucketCols.clear();
+          sourceTableSortCols.clear();
+
+          // Only columns can be selected for both sorted and bucketed positions
+          for (int pos : bucketPositions) {
+            ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+            if (!(selectColList instanceof ExprNodeColumnDesc)) {
+              return null;
+            }
+            sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+          }
+
+          for (ObjectPair<Integer, Integer> pos : sortPositions) {
+            ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
+            if (!(selectColList instanceof ExprNodeColumnDesc)) {
+              return null;
+            }
+            sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+          }
+        }
+      }
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Apr  3 07:05:40 2013
@@ -90,6 +90,10 @@ public class Optimizer {
       transformations.add(new SortedMergeBucketMapJoinOptimizer());
     }
 
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEBUCKETINGSORTING)) {
+      transformations.add(new BucketingSortingReduceSinkOptimizer());
+    }
+
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Wed Apr  3 07:05:40 2013
@@ -30,10 +30,12 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -74,6 +76,8 @@ public class ParseContext {
   private Map<MapJoinOperator, QBJoinTree> mapJoinContext;
   private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private HashMap<TableScanOperator, Table> topToTable;
+  private Map<FileSinkOperator, Table> fsopToTable;
+  private List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
   private HashMap<String, SplitSample> nameToSplitSample;
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
@@ -164,6 +168,7 @@ public class ParseContext {
       Map<JoinOperator, QBJoinTree> joinContext,
       Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext,
       HashMap<TableScanOperator, Table> topToTable,
+      Map<FileSinkOperator, Table> fsopToTable,
       List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
       Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
       UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer,
@@ -174,7 +179,8 @@ public class ParseContext {
       HashMap<String, SplitSample> nameToSplitSample,
       HashSet<ReadEntity> semanticInputs, List<Task<? extends Serializable>> rootTasks,
       Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner,
-      Map<String, ReadEntity> viewAliasToInput) {
+      Map<String, ReadEntity> viewAliasToInput,
+      List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
     this.conf = conf;
     this.qb = qb;
     this.ast = ast;
@@ -183,6 +189,7 @@ public class ParseContext {
     this.joinContext = joinContext;
     this.smbMapJoinContext = smbMapJoinContext;
     this.topToTable = topToTable;
+    this.fsopToTable = fsopToTable;
     this.loadFileWork = loadFileWork;
     this.loadTableWork = loadTableWork;
     this.opParseCtx = opParseCtx;
@@ -203,6 +210,8 @@ public class ParseContext {
     this.rootTasks = rootTasks;
     this.opToPartToSkewedPruner = opToPartToSkewedPruner;
     this.viewAliasToInput = viewAliasToInput;
+    this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
+        reduceSinkOperatorsAddedByEnforceBucketingSorting;
   }
 
   /**
@@ -304,6 +313,24 @@ public class ParseContext {
     this.topToTable = topToTable;
   }
 
+  public Map<FileSinkOperator, Table> getFsopToTable() {
+    return fsopToTable;
+  }
+
+  public void setFsopToTable(Map<FileSinkOperator, Table> fsopToTable) {
+    this.fsopToTable = fsopToTable;
+  }
+
+  public List<ReduceSinkOperator> getReduceSinkOperatorsAddedByEnforceBucketingSorting() {
+    return reduceSinkOperatorsAddedByEnforceBucketingSorting;
+  }
+
+  public void setReduceSinkOperatorsAddedByEnforceBucketingSorting(
+      List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
+    this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
+        reduceSinkOperatorsAddedByEnforceBucketingSorting;
+  }
+
   /**
    * @return the topOps
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Wed Apr  3 07:05:40 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadat
 public class PrunedPartitionList {
 
   // source table
-  private Table source;
+  private final Table source;
 
   // confirmed partitions - satisfy the partition criteria
   private Set<Partition> confirmedPartns;
@@ -44,7 +44,7 @@ public class PrunedPartitionList {
 
   /**
    * @param confirmedPartns
-   *          confirmed paritions
+   *          confirmed partitions
    * @param unknownPartns
    *          unknown partitions
    */
@@ -62,7 +62,7 @@ public class PrunedPartitionList {
 
   /**
    * get confirmed partitions.
-   * 
+   *
    * @return confirmedPartns confirmed paritions
    */
   public Set<Partition> getConfirmedPartns() {
@@ -71,7 +71,7 @@ public class PrunedPartitionList {
 
   /**
    * get unknown partitions.
-   * 
+   *
    * @return unknownPartns unknown paritions
    */
   public Set<Partition> getUnknownPartns() {
@@ -80,7 +80,7 @@ public class PrunedPartitionList {
 
   /**
    * get denied partitions.
-   * 
+   *
    * @return deniedPartns denied paritions
    */
   public Set<Partition> getDeniedPartns() {
@@ -99,7 +99,7 @@ public class PrunedPartitionList {
 
   /**
    * set confirmed partitions.
-   * 
+   *
    * @param confirmedPartns
    *          confirmed paritions
    */
@@ -109,7 +109,7 @@ public class PrunedPartitionList {
 
   /**
    * set unknown partitions.
-   * 
+   *
    * @param unknownPartns
    *          unknown partitions
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr  3 07:05:40 2013
@@ -223,6 +223,8 @@ public class SemanticAnalyzer extends Ba
   private Map<JoinOperator, QBJoinTree> joinContext;
   private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private final HashMap<TableScanOperator, Table> topToTable;
+  private final Map<FileSinkOperator, Table> fsopToTable;
+  private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
   private QB qb;
   private ASTNode ast;
   private int destTableId;
@@ -284,6 +286,8 @@ public class SemanticAnalyzer extends Ba
     joinContext = new HashMap<JoinOperator, QBJoinTree>();
     smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
     topToTable = new HashMap<TableScanOperator, Table>();
+    fsopToTable = new HashMap<FileSinkOperator, Table>();
+    reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
     destTableId = 1;
     uCtx = null;
     listMapJoinOpsNoReducer = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
@@ -342,11 +346,13 @@ public class SemanticAnalyzer extends Ba
 
   public ParseContext getParseContext() {
     return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
-        topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, loadTableWork,
+        topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable,
+        fsopToTable, loadTableWork,
         loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
-        opToPartToSkewedPruner, viewAliasToInput);
+        opToPartToSkewedPruner, viewAliasToInput,
+        reduceSinkOperatorsAddedByEnforceBucketingSorting);
   }
 
   @SuppressWarnings("nls")
@@ -5262,6 +5268,7 @@ public class SemanticAnalyzer extends Ba
           + dest_path + " row schema: " + inputRR.toString());
     }
 
+    fsopToTable.put((FileSinkOperator) output, dest_tab);
     return output;
   }
 
@@ -5669,6 +5676,7 @@ public class SemanticAnalyzer extends Ba
             partitionCols, order.toString(), numReducers),
         new RowSchema(inputRR.getColumnInfos()), input), inputRR);
     interim.setColumnExprMap(colExprMap);
+    reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
 
     // Add the extract operator to get the value fields
     RowResolver out_rwsch = new RowResolver();
@@ -5691,6 +5699,7 @@ public class SemanticAnalyzer extends Ba
       LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() +
           " row schema: " + out_rwsch.toString());
     }
+
     return output;
 
   }
@@ -8646,11 +8655,12 @@ public class SemanticAnalyzer extends Ba
 
     ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
         opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
-        topToTable,
+        topToTable, fsopToTable,
         loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
-        opToPartToSkewedPruner, viewAliasToInput);
+        opToPartToSkewedPruner, viewAliasToInput,
+        reduceSinkOperatorsAddedByEnforceBucketingSorting);
 
     // Generate table access stats if required
     if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Wed Apr  3 07:05:40 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 
@@ -46,7 +47,7 @@ public class TableScanDesc extends Abstr
   /**
    * Used for split sampling (row count per split)
    * For example,
-   *   select count(1) from ss_src2 tablesample(10 ROWS);
+   *   select count(1) from ss_src2 tablesample (10 ROWS) s;
    * provides first 10 rows from all input splits
    */
   private int rowLimit = -1;
@@ -67,6 +68,9 @@ public class TableScanDesc extends Abstr
   public static final String FILTER_TEXT_CONF_STR =
     "hive.io.filter.text";
 
+  // input file name (big) to bucket number
+  private Map<String, Integer> bucketFileNameMapping;
+
   @SuppressWarnings("nls")
   public TableScanDesc() {
   }
@@ -170,4 +174,12 @@ public class TableScanDesc extends Abstr
   public Integer getRowLimitExplain() {
     return rowLimit >= 0 ? rowLimit : null;
   }
+
+  public Map<String, Integer> getBucketFileNameMapping() {
+    return bucketFileNameMapping;
+  }
+
+  public void setBucketFileNameMapping(Map<String, Integer> bucketFileNameMapping) {
+    this.bucketFileNameMapping = bucketFileNameMapping;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q Wed Apr  3 07:05:40 2013
@@ -0,0 +1,65 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false; 
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table1 where ds = '1';
+select count(*) from test_table1 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table1 where ds = '1' and hash(key) % 2 = 1;
+select count(*) from test_table1 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s where ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '1';
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation, one of the buckets should be empty
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1' and a.key = 238;
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1' and a.key = 238;
+
+select count(*) from test_table2 where ds = '2';
+select count(*) from test_table2 where ds = '2' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '2' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '2';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '2';
+
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '3')
+SELECT a.key, a.value FROM test_table2 a WHERE a.ds = '2';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table2 a WHERE a.ds = '2';
+
+select count(*) from test_table2 where ds = '3';
+select count(*) from test_table2 where ds = '3' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '3' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '3';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '3';

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q Wed Apr  3 07:05:40 2013
@@ -0,0 +1,41 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false; 
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 16 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 16 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table1 where ds = '1';
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 0;
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 5;
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 12;
+select count(*) from test_table1 tablesample (bucket 1 out of 16) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 6 out of 16) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 13 out of 16) s where ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 5;
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 12;
+select count(*) from test_table2 tablesample (bucket 1 out of 16) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 6 out of 16) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 13 out of 16) s where ds = '1';

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q Wed Apr  3 07:05:40 2013
@@ -0,0 +1,53 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false; 
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key int, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key STRING, value1 STRING, value2 string) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- with different datatypes. This should be a map-reduce operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 1;
+
+CREATE TABLE test_table3 (key STRING, value1 int, value2 string) PARTITIONED BY (ds STRING)
+CLUSTERED BY (value1) SORTED BY (value1) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation, although the bucketing positions dont match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')
+SELECT a.value, a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')
+SELECT a.value, a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table3 where ds = '1';
+select count(*) from test_table3 where ds = '1' and hash(value1) % 2 = 0;
+select count(*) from test_table3 where ds = '1' and hash(value1) % 2 = 1;
+select count(*) from test_table3 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table3 tablesample (bucket 2 out of 2) s where ds = '1';
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- However, since an expression is being selected, it should involve a reducer
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key+a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q Wed Apr  3 07:05:40 2013
@@ -0,0 +1,77 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false; 
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key desc) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort orders does not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key, value) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (value) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the number of buckets do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';

Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q Wed Apr  3 07:05:40 2013
@@ -0,0 +1,55 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false; 
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+select count(*) from test_table1;
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s;
+
+select count(*) from test_table2;
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s;
+
+drop table test_table1;
+drop table test_table2;
+
+CREATE TABLE test_table1 (key INT, value STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+select count(*) from test_table1;
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s;
+
+select count(*) from test_table2;
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s;