You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2013/04/03 09:05:41 UTC
svn commit: r1463841 [1/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/metadata/ ql/src/java/org/a...
Author: namit
Date: Wed Apr 3 07:05:40 2013
New Revision: 1463841
URL: http://svn.apache.org/r1463841
Log:
HIVE-4240 optimize hive.enforce.bucketing and hive.enforce sorting insert
(Gang Tim Liu via namit)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q
hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_18.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_19.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_20.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_21.q.out
hive/trunk/ql/src/test/results/clientpositive/smb_mapjoin_22.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ObjectPair.java Wed Apr 3 07:05:40 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.common;
+
+
public class ObjectPair<F, S> {
private F first;
private S second;
@@ -44,4 +46,24 @@ public class ObjectPair<F, S> {
public void setSecond(S second) {
this.second = second;
}
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null) {
+ return false;
+ }
+ if (that instanceof ObjectPair) {
+ return this.equals((ObjectPair<F, S>)that);
+ }
+ return false;
+ }
+
+ public boolean equals(ObjectPair<F, S> that) {
+ if (that == null) {
+ return false;
+ }
+
+ return this.getFirst().equals(that.getFirst()) &&
+ this.getSecond().equals(that.getSecond());
+ }
}
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr 3 07:05:40 2013
@@ -513,6 +513,7 @@ public class HiveConf extends Configurat
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
+ HIVEOPTIMIZEBUCKETINGSORTING("hive.optimize.bucketingsorting", true),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
HIVEENFORCESORTMERGEBUCKETMAPJOIN("hive.enforce.sortmergebucketmapjoin", false),
HIVEENFORCEBUCKETMAPJOIN("hive.enforce.bucketmapjoin", false),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Apr 3 07:05:40 2013
@@ -933,6 +933,16 @@
</property>
<property>
+ <name>hive.optimize.bucketingsorting</name>
+ <value>true</value>
+ <description>If hive.enforce.bucketing or hive.enforce.sorting is true, dont create a reducer for enforcing
+ bucketing/sorting for queries of the form:
+ insert overwrite table T2 select * from T1;
+ where T1 and T2 are bucketed/sorted by the same keys into the same number of buckets.
+ </description>
+</property>
+
+<property>
<name>hive.enforce.sortmergebucketmapjoin</name>
<value>false</value>
<description>If the user asked for sort-merge bucketed map-side join, and it cannot be performed,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Wed Apr 3 07:05:40 2013
@@ -94,6 +94,19 @@ public class TableScanOperator extends O
@Override
public void cleanUpInputFileChangedOp() throws HiveException {
inputFileChanged = true;
+ // If the file name to bucket number mapping is maintained, store the bucket number
+ // in the execution context. This is needed for the following scenario:
+ // insert overwrite table T1 select * from T2;
+ // where T1 and T2 are sorted/bucketed by the same keys into the same number of buckets
+ // Although one mapper per file is used (bucketizedinputhiveinput), it is possible that
+ // any mapper can pick up any file (depending on the size of the files). The bucket number
+ // corresponding to the input file is stored to name the output bucket file appropriately.
+ Map<String, Integer> bucketNameMapping = conf != null ? conf.getBucketFileNameMapping() : null;
+ if ((bucketNameMapping != null) && (!bucketNameMapping.isEmpty())) {
+ String currentInputFile = getExecContext().getCurrentInputFile();
+ getExecContext().setFileId(Integer.toString(bucketNameMapping.get(
+ Utilities.getFileNameFromDirName(currentInputFile))));
+ }
}
private void gatherStats(Object row) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Wed Apr 3 07:05:40 2013
@@ -384,11 +384,10 @@ public class Partition implements Serial
}
/**
- * mapping from bucket number to bucket path
+ * get all paths for this partition in a sorted manner
*/
- // TODO: add test case and clean it up
@SuppressWarnings("nls")
- public Path getBucketPath(int bucketNum) {
+ public FileStatus[] getSortedPaths() {
try {
// Previously, this got the filesystem of the Table, which could be
// different from the filesystem of the partition.
@@ -407,11 +406,23 @@ public class Partition implements Serial
if (srcs.length == 0) {
return null;
}
- return srcs[bucketNum].getPath();
+ return srcs;
} catch (Exception e) {
- throw new RuntimeException("Cannot get bucket path for bucket "
- + bucketNum, e);
+ throw new RuntimeException("Cannot get path ", e);
+ }
+ }
+
+ /**
+ * mapping from bucket number to bucket path
+ */
+ // TODO: add test case and clean it up
+ @SuppressWarnings("nls")
+ public Path getBucketPath(int bucketNum) {
+ FileStatus srcs[] = getSortedPaths();
+ if (srcs == null) {
+ return null;
}
+ return srcs[bucketNum].getPath();
}
@SuppressWarnings("nls")
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Wed Apr 3 07:05:40 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
@@ -925,4 +927,30 @@ public class Table implements Serializab
Hive hive = Hive.get();
return hive.getIndexes(getTTable().getDbName(), getTTable().getTableName(), max);
}
+
+ @SuppressWarnings("nls")
+ public FileStatus[] getSortedPaths() {
+ try {
+ // Previously, this got the filesystem of the Table, which could be
+ // different from the filesystem of the partition.
+ FileSystem fs = FileSystem.get(getPath().toUri(), Hive.get()
+ .getConf());
+ String pathPattern = getPath().toString();
+ if (getNumBuckets() > 0) {
+ pathPattern = pathPattern + "/*";
+ }
+ LOG.info("Path pattern = " + pathPattern);
+ FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
+ Arrays.sort(srcs);
+ for (FileStatus src : srcs) {
+ LOG.info("Got file: " + src.getPath());
+ }
+ if (srcs.length == 0) {
+ return null;
+ }
+ return srcs;
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get path ", e);
+ }
+ }
};
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Wed Apr 3 07:05:40 2013
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * This transformation does optimization for enforcing bucketing and sorting.
+ * For a query of the form:
+ * insert overwrite table T1 select * from T2;
+ * where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
+ * enforce bucketing and sorting.
+ */
+public class BucketingSortingReduceSinkOptimizer implements Transform {
+
+ private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
+ .getName());
+
+ public BucketingSortingReduceSinkOptimizer() {
+ }
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ HiveConf conf = pctx.getConf();
+
+ // process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
+ opRules.put(new RuleRegExp("R1",
+ ReduceSinkOperator.getOperatorName() + "%" +
+ ExtractOperator.getOperatorName() + "%" +
+ FileSinkOperator.getOperatorName() + "%"),
+ getBucketSortReduceSinkProc(pctx));
+
+ // The dispatcher fires the processor corresponding to the closest matching rule
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+
+ return pctx;
+ }
+
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ };
+ }
+
+ private NodeProcessor getBucketSortReduceSinkProc(ParseContext pctx) {
+ return new BucketSortReduceSinkProcessor(pctx);
+ }
+
+ /**
+ * BucketSortReduceSinkProcessor.
+ *
+ */
+ public class BucketSortReduceSinkProcessor implements NodeProcessor {
+
+ protected ParseContext pGraphContext;
+
+ public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
+ this.pGraphContext = pGraphContext;
+ }
+
+ // Get the bucket positions for the table
+ private List<Integer> getBucketPositions(List<String> tabBucketCols,
+ List<FieldSchema> tabCols) {
+ List<Integer> posns = new ArrayList<Integer>();
+ for (String bucketCol : tabBucketCols) {
+ int pos = 0;
+ for (FieldSchema tabCol : tabCols) {
+ if (bucketCol.equals(tabCol.getName())) {
+ posns.add(pos);
+ break;
+ }
+ pos++;
+ }
+ }
+ return posns;
+ }
+
+ // Get the sort positions and sort order for the table
+ private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+ List<FieldSchema> tabCols) {
+ List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+ for (Order sortCol : tabSortCols) {
+ int pos = 0;
+ for (FieldSchema tabCol : tabCols) {
+ if (sortCol.getCol().equals(tabCol.getName())) {
+ posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+ break;
+ }
+ pos++;
+ }
+ }
+ return posns;
+ }
+
+ // Return true if the parition is bucketed/sorted by the specified positions
+ // The number of buckets, the sort order should also match along with the
+ // columns which are bucketed/sorted
+ private boolean checkPartition(Partition partition,
+ List<Integer> bucketPositionsDest,
+ List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ int numBucketsDest) {
+ // The bucketing and sorting positions should exactly match
+ int numBuckets = partition.getBucketCount();
+ if (numBucketsDest != numBuckets) {
+ return false;
+ }
+
+ List<Integer> partnBucketPositions =
+ getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
+ List<ObjectPair<Integer, Integer>> partnSortPositions =
+ getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+ return bucketPositionsDest.equals(partnBucketPositions) &&
+ sortPositionsDest.equals(partnSortPositions);
+ }
+
+ // Return true if the table is bucketed/sorted by the specified positions
+ // The number of buckets, the sort order should also match along with the
+ // columns which are bucketed/sorted
+ private boolean checkTable(Table table,
+ List<Integer> bucketPositionsDest,
+ List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ int numBucketsDest) {
+ // The bucketing and sorting positions should exactly match
+ int numBuckets = table.getNumBuckets();
+ if (numBucketsDest != numBuckets) {
+ return false;
+ }
+
+ List<Integer> tableBucketPositions =
+ getBucketPositions(table.getBucketCols(), table.getCols());
+ List<ObjectPair<Integer, Integer>> tableSortPositions =
+ getSortPositions(table.getSortCols(), table.getCols());
+ return bucketPositionsDest.equals(tableBucketPositions) &&
+ sortPositionsDest.equals(tableSortPositions);
+ }
+
+ private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
+ Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
+ for (int pos = 0; pos < srcs.length; pos++) {
+ bucketFileNameMapping.put(srcs[pos].getPath().getName(), pos);
+ }
+ tsOp.getConf().setBucketFileNameMapping(bucketFileNameMapping);
+ }
+
+ // Remove the reduceSinkOperator.
+ // The optimizer will automatically convert it to a map-only job.
+ private void removeReduceSink(ReduceSinkOperator rsOp,
+ TableScanOperator tsOp,
+ FileSinkOperator fsOp,
+ FileStatus[] srcs) {
+ if (srcs == null) {
+ return;
+ }
+
+ removeReduceSink(rsOp, tsOp, fsOp);
+ // Store the mapping -> path, bucket number
+ // This is needed since for the map-only job, any mapper can process any file.
+ // For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
+ // also output the file correspodning to bucket 2 of the output.
+ storeBucketPathMapping(tsOp, srcs);
+ }
+
+ // Remove the reduce sink operator
+ // Use bucketized hive input format so that one mapper processes exactly one file
+ private void removeReduceSink(ReduceSinkOperator rsOp,
+ TableScanOperator tsOp,
+ FileSinkOperator fsOp) {
+ Operator<? extends OperatorDesc> parRSOp = rsOp.getParentOperators().get(0);
+ parRSOp.getChildOperators().set(0, fsOp);
+ fsOp.getParentOperators().set(0, parRSOp);
+ fsOp.getConf().setMultiFileSpray(false);
+ fsOp.getConf().setTotalFiles(1);
+ fsOp.getConf().setNumFiles(1);
+ tsOp.setUseBucketizedHiveInputFormat(true);
+ }
+
+ private int findColumnPosition(List<FieldSchema> cols, String colName) {
+ int pos = 0;
+ for (FieldSchema col : cols) {
+ if (colName.equals(col.getName())) {
+ return pos;
+ }
+ pos++;
+ }
+ return -1;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+
+ // If the reduce sink has not been introduced due to bucketing/sorting, ignore it
+ FileSinkOperator fsOp = (FileSinkOperator) nd;
+ ExtractOperator exOp = (ExtractOperator) fsOp.getParentOperators().get(0);
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) exOp.getParentOperators().get(0);
+
+ List<ReduceSinkOperator> rsOps = pGraphContext
+ .getReduceSinkOperatorsAddedByEnforceBucketingSorting();
+ // nothing to do
+ if ((rsOps != null) && (!rsOps.contains(rsOp))) {
+ return null;
+ }
+
+ // Support for dynamic partitions can be added later
+ if (fsOp.getConf().getDynPartCtx() != null) {
+ return null;
+ }
+
+ // No conversion is possible for the reduce keys
+ for (ExprNodeDesc keyCol : rsOp.getConf().getKeyCols()) {
+ if (!(keyCol instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ }
+
+ Table destTable = pGraphContext.getFsopToTable().get(fsOp);
+ if (destTable == null) {
+ return null;
+ }
+
+ // Get the positions for sorted and bucketed columns
+ // For sorted columns, also get the order (ascending/descending) - that should
+ // also match for this to be converted to a map-only job.
+ List<Integer> bucketPositions =
+ getBucketPositions(destTable.getBucketCols(), destTable.getCols());
+ List<ObjectPair<Integer, Integer>> sortPositions =
+ getSortPositions(destTable.getSortCols(), destTable.getCols());
+
+ // Only selects and filters are allowed
+ Operator<? extends OperatorDesc> op = rsOp;
+ // TableScan will also be followed by a Select Operator. Find the expressions for the
+ // bucketed/sorted columns for the destination table
+ List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
+ List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+
+ while (true) {
+ if (op.getParentOperators().size() > 1) {
+ return null;
+ }
+
+ op = op.getParentOperators().get(0);
+ if (!(op instanceof TableScanOperator) &&
+ !(op instanceof FilterOperator) &&
+ !(op instanceof SelectOperator)) {
+ return null;
+ }
+
+ // nothing to be done for filters - the output schema does not change.
+ if (op instanceof TableScanOperator) {
+ Table srcTable = pGraphContext.getTopToTable().get(op);
+
+ // Find the positions of the bucketed columns in the table corresponding
+ // to the select list.
+ // Consider the following scenario:
+ // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+ // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+ // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+ // should be optimized.
+
+ // Start with the destination: T2, bucketed/sorted position is [1]
+ // At the source T1, the column corresponding to that position is [key], which
+ // maps to column [0] of T1, which is also bucketed/sorted into the same
+ // number of buckets
+ List<Integer> newBucketPositions = new ArrayList<Integer>();
+ for (int pos = 0; pos < bucketPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+ String colName = col.getColumn();
+ int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+ if (bucketPos < 0) {
+ return null;
+ }
+ newBucketPositions.add(bucketPos);
+ }
+
+ // Find the positions/order of the sorted columns in the table corresponding
+ // to the select list.
+ List<ObjectPair<Integer, Integer>> newSortPositions =
+ new ArrayList<ObjectPair<Integer, Integer>>();
+ for (int pos = 0; pos < sortPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+ String colName = col.getColumn();
+ int sortPos = findColumnPosition(srcTable.getCols(), colName);
+ if (sortPos < 0) {
+ return null;
+ }
+ newSortPositions.add(
+ new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+ }
+
+
+ if (srcTable.isPartitioned()) {
+ PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+ List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+ // Support for dynamic partitions can be added later
+ // The following is not optimized:
+ // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+ // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+ if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+ return null;
+ }
+ for (Partition partition : partitions) {
+ if (!checkPartition(partition, newBucketPositions, newSortPositions,
+ pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+ return null;
+ }
+ }
+
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+ partitions.get(0).getSortedPaths());
+ return null;
+ }
+ else {
+ if (!checkTable(srcTable, newBucketPositions, newSortPositions,
+ pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+ return null;
+ }
+
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
+ return null;
+ }
+ }
+ // None of the operators is changing the positions
+ else if (op instanceof SelectOperator) {
+ SelectOperator selectOp = (SelectOperator) op;
+ SelectDesc selectDesc = selectOp.getConf();
+
+ // There may be multiple selects - chose the one closest to the table
+ sourceTableBucketCols.clear();
+ sourceTableSortCols.clear();
+
+ // Only columns can be selected for both sorted and bucketed positions
+ for (int pos : bucketPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+ }
+
+ for (ObjectPair<Integer, Integer> pos : sortPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+ }
+ }
+ }
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Apr 3 07:05:40 2013
@@ -90,6 +90,10 @@ public class Optimizer {
transformations.add(new SortedMergeBucketMapJoinOptimizer());
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEBUCKETINGSORTING)) {
+ transformations.add(new BucketingSortingReduceSinkOptimizer());
+ }
+
transformations.add(new UnionProcessor());
transformations.add(new JoinReorder());
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Wed Apr 3 07:05:40 2013
@@ -30,10 +30,12 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -74,6 +76,8 @@ public class ParseContext {
private Map<MapJoinOperator, QBJoinTree> mapJoinContext;
private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
private HashMap<TableScanOperator, Table> topToTable;
+ private Map<FileSinkOperator, Table> fsopToTable;
+ private List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
private HashMap<String, SplitSample> nameToSplitSample;
private List<LoadTableDesc> loadTableWork;
private List<LoadFileDesc> loadFileWork;
@@ -164,6 +168,7 @@ public class ParseContext {
Map<JoinOperator, QBJoinTree> joinContext,
Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext,
HashMap<TableScanOperator, Table> topToTable,
+ Map<FileSinkOperator, Table> fsopToTable,
List<LoadTableDesc> loadTableWork, List<LoadFileDesc> loadFileWork,
Context ctx, HashMap<String, String> idToTableNameMap, int destTableId,
UnionProcContext uCtx, List<AbstractMapJoinOperator<? extends MapJoinDesc>> listMapJoinOpsNoReducer,
@@ -174,7 +179,8 @@ public class ParseContext {
HashMap<String, SplitSample> nameToSplitSample,
HashSet<ReadEntity> semanticInputs, List<Task<? extends Serializable>> rootTasks,
Map<TableScanOperator, Map<String, ExprNodeDesc>> opToPartToSkewedPruner,
- Map<String, ReadEntity> viewAliasToInput) {
+ Map<String, ReadEntity> viewAliasToInput,
+ List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
this.conf = conf;
this.qb = qb;
this.ast = ast;
@@ -183,6 +189,7 @@ public class ParseContext {
this.joinContext = joinContext;
this.smbMapJoinContext = smbMapJoinContext;
this.topToTable = topToTable;
+ this.fsopToTable = fsopToTable;
this.loadFileWork = loadFileWork;
this.loadTableWork = loadTableWork;
this.opParseCtx = opParseCtx;
@@ -203,6 +210,8 @@ public class ParseContext {
this.rootTasks = rootTasks;
this.opToPartToSkewedPruner = opToPartToSkewedPruner;
this.viewAliasToInput = viewAliasToInput;
+ this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
+ reduceSinkOperatorsAddedByEnforceBucketingSorting;
}
/**
@@ -304,6 +313,24 @@ public class ParseContext {
this.topToTable = topToTable;
}
+ public Map<FileSinkOperator, Table> getFsopToTable() {
+ return fsopToTable;
+ }
+
+ public void setFsopToTable(Map<FileSinkOperator, Table> fsopToTable) {
+ this.fsopToTable = fsopToTable;
+ }
+
+ public List<ReduceSinkOperator> getReduceSinkOperatorsAddedByEnforceBucketingSorting() {
+ return reduceSinkOperatorsAddedByEnforceBucketingSorting;
+ }
+
+ public void setReduceSinkOperatorsAddedByEnforceBucketingSorting(
+ List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting) {
+ this.reduceSinkOperatorsAddedByEnforceBucketingSorting =
+ reduceSinkOperatorsAddedByEnforceBucketingSorting;
+ }
+
/**
* @return the topOps
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Wed Apr 3 07:05:40 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadat
public class PrunedPartitionList {
// source table
- private Table source;
+ private final Table source;
// confirmed partitions - satisfy the partition criteria
private Set<Partition> confirmedPartns;
@@ -44,7 +44,7 @@ public class PrunedPartitionList {
/**
* @param confirmedPartns
- * confirmed paritions
+ * confirmed partitions
* @param unknownPartns
* unknown partitions
*/
@@ -62,7 +62,7 @@ public class PrunedPartitionList {
/**
* get confirmed partitions.
- *
+ *
* @return confirmedPartns confirmed paritions
*/
public Set<Partition> getConfirmedPartns() {
@@ -71,7 +71,7 @@ public class PrunedPartitionList {
/**
* get unknown partitions.
- *
+ *
* @return unknownPartns unknown paritions
*/
public Set<Partition> getUnknownPartns() {
@@ -80,7 +80,7 @@ public class PrunedPartitionList {
/**
* get denied partitions.
- *
+ *
* @return deniedPartns denied paritions
*/
public Set<Partition> getDeniedPartns() {
@@ -99,7 +99,7 @@ public class PrunedPartitionList {
/**
* set confirmed partitions.
- *
+ *
* @param confirmedPartns
* confirmed paritions
*/
@@ -109,7 +109,7 @@ public class PrunedPartitionList {
/**
* set unknown partitions.
- *
+ *
* @param unknownPartns
* unknown partitions
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr 3 07:05:40 2013
@@ -223,6 +223,8 @@ public class SemanticAnalyzer extends Ba
private Map<JoinOperator, QBJoinTree> joinContext;
private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
private final HashMap<TableScanOperator, Table> topToTable;
+ private final Map<FileSinkOperator, Table> fsopToTable;
+ private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
private QB qb;
private ASTNode ast;
private int destTableId;
@@ -284,6 +286,8 @@ public class SemanticAnalyzer extends Ba
joinContext = new HashMap<JoinOperator, QBJoinTree>();
smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
topToTable = new HashMap<TableScanOperator, Table>();
+ fsopToTable = new HashMap<FileSinkOperator, Table>();
+ reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
destTableId = 1;
uCtx = null;
listMapJoinOpsNoReducer = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
@@ -342,11 +346,13 @@ public class SemanticAnalyzer extends Ba
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
- topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, loadTableWork,
+ topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable,
+ fsopToTable, loadTableWork,
loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
- opToPartToSkewedPruner, viewAliasToInput);
+ opToPartToSkewedPruner, viewAliasToInput,
+ reduceSinkOperatorsAddedByEnforceBucketingSorting);
}
@SuppressWarnings("nls")
@@ -5262,6 +5268,7 @@ public class SemanticAnalyzer extends Ba
+ dest_path + " row schema: " + inputRR.toString());
}
+ fsopToTable.put((FileSinkOperator) output, dest_tab);
return output;
}
@@ -5669,6 +5676,7 @@ public class SemanticAnalyzer extends Ba
partitionCols, order.toString(), numReducers),
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
interim.setColumnExprMap(colExprMap);
+ reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
// Add the extract operator to get the value fields
RowResolver out_rwsch = new RowResolver();
@@ -5691,6 +5699,7 @@ public class SemanticAnalyzer extends Ba
LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() +
" row schema: " + out_rwsch.toString());
}
+
return output;
}
@@ -8646,11 +8655,12 @@ public class SemanticAnalyzer extends Ba
ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
- topToTable,
+ topToTable, fsopToTable,
loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
- opToPartToSkewedPruner, viewAliasToInput);
+ opToPartToSkewedPruner, viewAliasToInput,
+ reduceSinkOperatorsAddedByEnforceBucketingSorting);
// Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1463841&r1=1463840&r2=1463841&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Wed Apr 3 07:05:40 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -46,7 +47,7 @@ public class TableScanDesc extends Abstr
/**
* Used for split sampling (row count per split)
* For example,
- * select count(1) from ss_src2 tablesample(10 ROWS);
+ * select count(1) from ss_src2 tablesample (10 ROWS) s;
* provides first 10 rows from all input splits
*/
private int rowLimit = -1;
@@ -67,6 +68,9 @@ public class TableScanDesc extends Abstr
public static final String FILTER_TEXT_CONF_STR =
"hive.io.filter.text";
+ // input file name (big) to bucket number
+ private Map<String, Integer> bucketFileNameMapping;
+
@SuppressWarnings("nls")
public TableScanDesc() {
}
@@ -170,4 +174,12 @@ public class TableScanDesc extends Abstr
public Integer getRowLimitExplain() {
return rowLimit >= 0 ? rowLimit : null;
}
+
+ public Map<String, Integer> getBucketFileNameMapping() {
+ return bucketFileNameMapping;
+ }
+
+ public void setBucketFileNameMapping(Map<String, Integer> bucketFileNameMapping) {
+ this.bucketFileNameMapping = bucketFileNameMapping;
+ }
}
Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_18.q Wed Apr 3 07:05:40 2013
@@ -0,0 +1,65 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table1 where ds = '1';
+select count(*) from test_table1 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table1 where ds = '1' and hash(key) % 2 = 1;
+select count(*) from test_table1 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s where ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '1';
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation, one of the buckets should be empty
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1' and a.key = 238;
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1' and a.key = 238;
+
+select count(*) from test_table2 where ds = '2';
+select count(*) from test_table2 where ds = '2' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '2' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '2';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '2';
+
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '3')
+SELECT a.key, a.value FROM test_table2 a WHERE a.ds = '2';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key, a.value FROM test_table2 a WHERE a.ds = '2';
+
+select count(*) from test_table2 where ds = '3';
+select count(*) from test_table2 where ds = '3' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '3' and hash(key) % 2 = 1;
+select count(*) from test_table2 tablesample (bucket 1 out of 2) s where ds = '3';
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s where ds = '3';
Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_19.q Wed Apr 3 07:05:40 2013
@@ -0,0 +1,41 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 16 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 16 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table1 where ds = '1';
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 0;
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 5;
+select count(*) from test_table1 where ds = '1' and hash(key) % 16 = 12;
+select count(*) from test_table1 tablesample (bucket 1 out of 16) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 6 out of 16) s where ds = '1';
+select count(*) from test_table1 tablesample (bucket 13 out of 16) s where ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 5;
+select count(*) from test_table2 where ds = '1' and hash(key) % 16 = 12;
+select count(*) from test_table2 tablesample (bucket 1 out of 16) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 6 out of 16) s where ds = '1';
+select count(*) from test_table2 tablesample (bucket 13 out of 16) s where ds = '1';
Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_20.q Wed Apr 3 07:05:40 2013
@@ -0,0 +1,53 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key int, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key STRING, value1 STRING, value2 string) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- with different datatypes. This should be a map-reduce operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table2 where ds = '1';
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 0;
+select count(*) from test_table2 where ds = '1' and hash(key) % 2 = 1;
+
+CREATE TABLE test_table3 (key STRING, value1 int, value2 string) PARTITIONED BY (ds STRING)
+CLUSTERED BY (value1) SORTED BY (value1) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation, although the bucketing positions dont match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')
+SELECT a.value, a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+INSERT OVERWRITE TABLE test_table3 PARTITION (ds = '1')
+SELECT a.value, a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+select count(*) from test_table3 where ds = '1';
+select count(*) from test_table3 where ds = '1' and hash(value1) % 2 = 0;
+select count(*) from test_table3 where ds = '1' and hash(value1) % 2 = 1;
+select count(*) from test_table3 tablesample (bucket 1 out of 2) s where ds = '1';
+select count(*) from test_table3 tablesample (bucket 2 out of 2) s where ds = '1';
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- However, since an expression is being selected, it should involve a reducer
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '2')
+SELECT a.key+a.key, a.value, a.value FROM test_table1 a WHERE a.ds = '1';
Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_21.q Wed Apr 3 07:05:40 2013
@@ -0,0 +1,77 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 PARTITION (ds = '1') SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key desc) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort orders does not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key, value) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (value) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since the number of buckets do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
+
+drop table test_table2;
+
+CREATE TABLE test_table2 (key INT, value STRING) PARTITIONED BY (ds STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-reduce operation since sort columns do not match
+EXPLAIN
+INSERT OVERWRITE TABLE test_table2 PARTITION (ds = '1')
+SELECT a.key, a.value FROM test_table1 a WHERE a.ds = '1';
Added: hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q?rev=1463841&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/smb_mapjoin_22.q Wed Apr 3 07:05:40 2013
@@ -0,0 +1,55 @@
+set hive.optimize.bucketmapjoin = true;
+set hive.optimize.bucketmapjoin.sortedmerge = true;
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+set hive.exec.reducers.max = 1;
+set hive.merge.mapfiles=false;
+set hive.merge.mapredfiles=false;
+
+-- Create two bucketed and sorted tables
+CREATE TABLE test_table1 (key INT, value STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+select count(*) from test_table1;
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s;
+
+select count(*) from test_table2;
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s;
+
+drop table test_table1;
+drop table test_table2;
+
+CREATE TABLE test_table1 (key INT, value STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+CREATE TABLE test_table2 (key INT, value STRING)
+CLUSTERED BY (key) INTO 2 BUCKETS;
+
+FROM src
+INSERT OVERWRITE TABLE test_table1 SELECT *;
+
+-- Insert data into the bucketed table by selecting from another bucketed table
+-- This should be a map-only operation
+EXPLAIN INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+INSERT OVERWRITE TABLE test_table2
+SELECT * FROM test_table1;
+
+select count(*) from test_table1;
+select count(*) from test_table1 tablesample (bucket 2 out of 2) s;
+
+select count(*) from test_table2;
+select count(*) from test_table2 tablesample (bucket 2 out of 2) s;