You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/12 19:35:20 UTC
svn commit: r1467364 - in /hive/branches/vectorization: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/s...
Author: hashutosh
Date: Fri Apr 12 17:35:19 2013
New Revision: 1467364
URL: http://svn.apache.org/r1467364
Log:
Merged in with trunk
Added:
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_1.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_1.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_2.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_2.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_3.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_3.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_4.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_4.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_5.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_5.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_7.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_7.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_8.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_8.q
hive/branches/vectorization/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q
- copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_1.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_1.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_2.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_2.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_3.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_3.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_4.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_4.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_5.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_5.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_6.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_6.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_7.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_7.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
hive/branches/vectorization/ql/src/test/results/clientpositive/multi_insert_lateral_view.q.out
- copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/multi_insert_lateral_view.q.out
Modified:
hive/branches/vectorization/ (props changed)
hive/branches/vectorization/build.properties
hive/branches/vectorization/build.xml
hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/vectorization/conf/hive-default.xml.template
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Propchange: hive/branches/vectorization/
------------------------------------------------------------------------------
svn:mergeinfo = /hive/trunk:1466908-1467363
Modified: hive/branches/vectorization/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.properties?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/build.properties (original)
+++ hive/branches/vectorization/build.properties Fri Apr 12 17:35:19 2013
@@ -17,6 +17,7 @@
Name=Hive
name=hive
version=0.12.0-SNAPSHOT
+hcatalog.version=0.11.0-SNAPSHOT
year=2012
javac.debug=on
Modified: hive/branches/vectorization/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.xml?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/build.xml (original)
+++ hive/branches/vectorization/build.xml Fri Apr 12 17:35:19 2013
@@ -515,7 +515,7 @@
inheritAll="false"/>
<mkdir dir="${build.dir.hive}/hcatalog"/>
<copy todir="${build.dir.hive}/hcatalog">
- <fileset dir="${hive.root}/hcatalog/build/hcatalog-${version}"/>
+ <fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
</copy>
<!-- special case because builtins compilation depends on packaging
@@ -1081,7 +1081,7 @@
todir="${mvn.jar.dir}" />
<copy file="${build.dir.hive}/metastore/hive-metastore-${version}.jar"
todir="${mvn.jar.dir}" />
- <copy file="${build.dir.hive}/hcatalog/hive-hcatalog-${version}.jar"
+ <copy file="${build.dir.hive}/hcatalog/hive-hcatalog-${hcatalog.version}.jar"
todir="${mvn.jar.dir}" />
<copy file="${build.dir.hive}/pdk/hive-pdk-${version}.jar"
todir="${mvn.jar.dir}" />
@@ -1116,7 +1116,7 @@
<copy file="${build.dir.hive}/metastore/pom.xml"
tofile="${mvn.pom.dir}/hive-metastore-${version}.pom" />
<copy file="${build.dir.hive}/hcatalog/pom.xml"
- tofile="${mvn.pom.dir}/hive-hcatalog-${version}.pom" />
+ tofile="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom" />
<copy file="${build.dir.hive}/pdk/pom.xml"
tofile="${mvn.pom.dir}/hive-pdk-${version}.pom" />
<copy file="${build.dir.hive}/ql/pom.xml"
@@ -1375,12 +1375,12 @@
<!-- hive-hcatalog -->
<sign-artifact
- input.file="${mvn.pom.dir}/hive-hcatalog-${version}.jar"
- output.file="${mvn.pom.dir}/hive-hcatalog-${version}.jar.asc"
+ input.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.jar"
+ output.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.jar.asc"
gpg.passphrase="${gpg.passphrase}"/>
<sign-artifact
- input.file="${mvn.pom.dir}/hive-hcatalog-${version}.pom"
- output.file="${mvn.pom.dir}/hive-hcatalog-${version}.pom.asc"
+ input.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom"
+ output.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom.asc"
gpg.passphrase="${gpg.passphrase}"/>
<!-- hive-pdk -->
Modified: hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Apr 12 17:35:19 2013
@@ -523,7 +523,7 @@ public class HiveConf extends Configurat
HIVE_AUTO_SORTMERGE_JOIN("hive.auto.convert.sortmerge.join", false),
HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR(
"hive.auto.convert.sortmerge.join.bigtable.selection.policy",
- "org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ"),
+ "org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ"),
HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
HIVEROWOFFSET("hive.exec.rowoffset", false),
Modified: hive/branches/vectorization/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/conf/hive-default.xml.template?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/conf/hive-default.xml.template (original)
+++ hive/branches/vectorization/conf/hive-default.xml.template Fri Apr 12 17:35:19 2013
@@ -969,9 +969,11 @@
<property>
<name>hive.auto.convert.sortmerge.join.bigtable.selection.policy</name>
- <value>org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ</value>
+ <value>org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ</value>
<description>The policy to choose the big table for automatic conversion to sort-merge join.
- By default, the leftmost table is assigned the big table. Other policies are based on size:
+ By default, the table with the largest partitions is assigned the big table. All policies are:
+ . based on position of the table - the leftmost table is selected
+ org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSMJ.
. based on total size (all the partitions selected in the query) of the table
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ.
. based on average size (all the partitions selected in the query) of the table
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Apr 12 17:35:19 2013
@@ -385,7 +385,7 @@ public abstract class CommonJoinOperator
// all evaluation should be processed here for valid aliasFilterTags
//
// for MapJoin, filter tag is pre-calculated in MapredLocalTask and stored with value.
- // when reading the hashtable, MapJoinObjectValue calcuates alias filter and provide it to join
+ // when reading the hashtable, MapJoinObjectValue calculates alias filter and provide it to join
protected ArrayList<Object> getFilteredValue(byte alias, Object row) throws HiveException {
boolean hasFilter = hasFilter(alias);
ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues[alias],
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Apr 12 17:35:19 2013
@@ -1549,6 +1549,7 @@ public abstract class Operator<T extends
start++;
}
builder.append(name);
+ start += name.length();
if (added) {
if (op.getNumChild() > 0) {
List<Operator<?>> children = op.getChildOperators();
@@ -1559,7 +1560,7 @@ public abstract class Operator<T extends
builder.append(' ');
}
}
- toString(builder, visited, children.get(i), start += name.length());
+ toString(builder, visited, children.get(i), start);
}
}
return true;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Fri Apr 12 17:35:19 2013
@@ -98,10 +98,11 @@ public class TableScanOperator extends O
// in the execution context. This is needed for the following scenario:
// insert overwrite table T1 select * from T2;
// where T1 and T2 are sorted/bucketed by the same keys into the same number of buckets
- // Although one mapper per file is used (bucketizedinputhiveinput), it is possible that
+ // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
// any mapper can pick up any file (depending on the size of the files). The bucket number
// corresponding to the input file is stored to name the output bucket file appropriately.
- Map<String, Integer> bucketNameMapping = conf != null ? conf.getBucketFileNameMapping() : null;
+ Map<String, Integer> bucketNameMapping =
+ (conf != null) ? conf.getBucketFileNameMapping() : null;
if ((bucketNameMapping != null) && (!bucketNameMapping.isEmpty())) {
String currentInputFile = getExecContext().getCurrentInputFile();
getExecContext().setFileId(Integer.toString(bucketNameMapping.get(
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 12 17:35:19 2013
@@ -25,11 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
@@ -37,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
/**
@@ -64,12 +63,15 @@ import org.apache.hadoop.hive.ql.plan.Se
* insert overwrite table T1 select * from T2;
* where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
* enforce bucketing and sorting.
+ *
+ * It also optimizes queries of the form:
+ * insert overwrite table T1
+ * select * from T1 join T2 on T1.key = T2.key
+ * where T1, T2 and T3 are bucketized/sorted on the same key 'key', we don't need a reducer
+ * to enforce bucketing and sorting
*/
public class BucketingSortingReduceSinkOptimizer implements Transform {
- private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
- .getName());
-
public BucketingSortingReduceSinkOptimizer() {
}
@@ -77,7 +79,6 @@ public class BucketingSortingReduceSinkO
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- HiveConf conf = pctx.getConf();
// process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
opRules.put(new RuleRegExp("R1",
@@ -90,7 +91,7 @@ public class BucketingSortingReduceSinkO
Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
- // Create a list of topop nodes
+ // Create a list of top nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.getTopOps().values());
ogw.startWalking(topNodes, null);
@@ -117,7 +118,6 @@ public class BucketingSortingReduceSinkO
*
*/
public class BucketSortReduceSinkProcessor implements NodeProcessor {
-
protected ParseContext pGraphContext;
public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
@@ -142,28 +142,33 @@ public class BucketingSortingReduceSinkO
}
// Get the sort positions and sort order for the table
- private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+ // The sort order contains whether the sorting is happening ascending or descending
+ private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(
+ List<Order> tabSortCols,
List<FieldSchema> tabCols) {
- List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+ List<Integer> sortPositions = new ArrayList<Integer>();
+ List<Integer> sortOrders = new ArrayList<Integer>();
for (Order sortCol : tabSortCols) {
int pos = 0;
for (FieldSchema tabCol : tabCols) {
if (sortCol.getCol().equals(tabCol.getName())) {
- posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+ sortPositions.add(pos);
+ sortOrders.add(sortCol.getOrder());
break;
}
pos++;
}
}
- return posns;
+ return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders);
}
- // Return true if the parition is bucketed/sorted by the specified positions
+ // Return true if the partition is bucketed/sorted by the specified positions
// The number of buckets, the sort order should also match along with the
// columns which are bucketed/sorted
private boolean checkPartition(Partition partition,
List<Integer> bucketPositionsDest,
- List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ List<Integer> sortPositionsDest,
+ List<Integer> sortOrderDest,
int numBucketsDest) {
// The bucketing and sorting positions should exactly match
int numBuckets = partition.getBucketCount();
@@ -173,10 +178,11 @@ public class BucketingSortingReduceSinkO
List<Integer> partnBucketPositions =
getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
- List<ObjectPair<Integer, Integer>> partnSortPositions =
- getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+ ObjectPair<List<Integer>, List<Integer>> partnSortPositionsOrder =
+ getSortPositionsOrder(partition.getSortCols(), partition.getTable().getCols());
return bucketPositionsDest.equals(partnBucketPositions) &&
- sortPositionsDest.equals(partnSortPositions);
+ sortPositionsDest.equals(partnSortPositionsOrder.getFirst()) &&
+ sortOrderDest.equals(partnSortPositionsOrder.getSecond());
}
// Return true if the table is bucketed/sorted by the specified positions
@@ -184,7 +190,8 @@ public class BucketingSortingReduceSinkO
// columns which are bucketed/sorted
private boolean checkTable(Table table,
List<Integer> bucketPositionsDest,
- List<ObjectPair<Integer, Integer>> sortPositionsDest,
+ List<Integer> sortPositionsDest,
+ List<Integer> sortOrderDest,
int numBucketsDest) {
// The bucketing and sorting positions should exactly match
int numBuckets = table.getNumBuckets();
@@ -194,12 +201,17 @@ public class BucketingSortingReduceSinkO
List<Integer> tableBucketPositions =
getBucketPositions(table.getBucketCols(), table.getCols());
- List<ObjectPair<Integer, Integer>> tableSortPositions =
- getSortPositions(table.getSortCols(), table.getCols());
+ ObjectPair<List<Integer>, List<Integer>> tableSortPositionsOrder =
+ getSortPositionsOrder(table.getSortCols(), table.getCols());
return bucketPositionsDest.equals(tableBucketPositions) &&
- sortPositionsDest.equals(tableSortPositions);
+ sortPositionsDest.equals(tableSortPositionsOrder.getFirst()) &&
+ sortOrderDest.equals(tableSortPositionsOrder.getSecond());
}
+ // Store the bucket path to bucket number mapping in the table scan operator.
+ // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
+ // any mapper can pick up any file (depending on the size of the files). The bucket number
+ // corresponding to the input file is stored to name the output bucket file appropriately.
private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
for (int pos = 0; pos < srcs.length; pos++) {
@@ -222,12 +234,12 @@ public class BucketingSortingReduceSinkO
// Store the mapping -> path, bucket number
// This is needed since for the map-only job, any mapper can process any file.
// For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
- // also output the file correspodning to bucket 2 of the output.
+ // also output the file corresponding to bucket 2 of the output.
storeBucketPathMapping(tsOp, srcs);
}
// Remove the reduce sink operator
- // Use bucketized hive input format so that one mapper processes exactly one file
+ // Use BucketizedHiveInputFormat so that one mapper processes exactly one file
private void removeReduceSink(ReduceSinkOperator rsOp,
TableScanOperator tsOp,
FileSinkOperator fsOp) {
@@ -251,6 +263,97 @@ public class BucketingSortingReduceSinkO
return -1;
}
+ // The output columns for the destination table should match with the join keys
+ // This is to handle queries of the form:
+ // insert overwrite table T3
+ // select T1.key, T1.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // where T1, T2 and T3 are bucketized/sorted on key and key2
+ // Assuming T1 is the table on which the mapper is run, the following is true:
+ // . The number of buckets for T1 and T3 should be same
+ // . The bucketing/sorting columns for T1, T2 and T3 should be same
+ // . The sort order of T1 should match with the sort order for T3.
+ // . If T1 is partitioned, only a single partition of T1 can be selected.
+ // . The select list should contain with (T1.key, T1.key2) or (T2.key, T2.key2)
+ // . After the join, only selects and filters are allowed.
+ private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc,
+ List<ExprNodeColumnDesc> sourceTableBucketCols,
+ List<ExprNodeColumnDesc> sourceTableSortCols,
+ List<Integer> sortOrder) {
+ // The sort-merge join creates the output sorted and bucketized by the same columns.
+ // This can be relaxed in the future if there is a requirement.
+ if (!sourceTableBucketCols.equals(sourceTableSortCols)) {
+ return false;
+ }
+
+ // Get the total number of columns selected, and for each output column, store the
+ // base table it points to. For
+ // insert overwrite table T3
+ // select T1.key, T1.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // the following arrays are created
+ // [0, 0, 0, 1] --> [T1, T1, T1, T2] (table mapping)
+ // [0, 1, 2, 0] --> [T1.0, T1.1, T1.2, T2.0] (table columns mapping)
+ Byte[] tagOrder = smbJoinDesc.getTagOrder();
+ Map<Byte, List<Integer>> retainList = smbJoinDesc.getRetainList();
+ int totalNumberColumns = 0;
+ for (Byte tag : tagOrder) {
+ totalNumberColumns += retainList.get(tag).size();
+ }
+
+ byte[] columnTableMappings = new byte[totalNumberColumns];
+ int[] columnNumberMappings = new int[totalNumberColumns];
+ int currentColumnPosition = 0;
+ for (Byte tag : tagOrder) {
+ for (int pos = 0; pos < retainList.get(tag).size(); pos++) {
+ columnTableMappings[currentColumnPosition] = tag;
+ columnNumberMappings[currentColumnPosition] = pos;
+ currentColumnPosition++;
+ }
+ }
+
+ // All output columns used for bucketing/sorting of the destination table should
+ // belong to the same input table
+ // insert overwrite table T3
+ // select T1.key, T2.key2, UDF(T1.value, T2.value)
+ // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+ // is not optimized, whereas the insert is optimized if the select list is either changed to
+ // (T1.key, T1.key2, UDF(T1.value, T2.value)) or (T2.key, T2.key2, UDF(T1.value, T2.value))
+ // Get the input table and make sure the keys match
+ List<String> outputColumnNames = smbJoinDesc.getOutputColumnNames();
+ byte tableTag = -1;
+ int[] columnNumbersExprList = new int[sourceTableBucketCols.size()];
+ int currentColPosition = 0;
+ for (ExprNodeColumnDesc bucketCol : sourceTableBucketCols) {
+ String colName = bucketCol.getColumn();
+ int colNumber = outputColumnNames.indexOf(colName);
+ if (colNumber < 0) {
+ return false;
+ }
+ if (tableTag < 0) {
+ tableTag = columnTableMappings[colNumber];
+ }
+ else if (tableTag != columnTableMappings[colNumber]) {
+ return false;
+ }
+ columnNumbersExprList[currentColPosition++] = columnNumberMappings[colNumber];
+ }
+
+ List<ExprNodeDesc> allExprs = smbJoinDesc.getExprs().get(tableTag);
+ List<ExprNodeDesc> keysSelectedTable = smbJoinDesc.getKeys().get(tableTag);
+ currentColPosition = 0;
+ for (ExprNodeDesc keySelectedTable : keysSelectedTable) {
+ if (!(keySelectedTable instanceof ExprNodeColumnDesc)) {
+ return false;
+ }
+ if (!allExprs.get(columnNumbersExprList[currentColPosition++]).isSame(keySelectedTable)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -283,14 +386,21 @@ public class BucketingSortingReduceSinkO
if (destTable == null) {
return null;
}
+ int numBucketsDestination = destTable.getNumBuckets();
// Get the positions for sorted and bucketed columns
// For sorted columns, also get the order (ascending/descending) - that should
// also match for this to be converted to a map-only job.
+ // Get the positions for sorted and bucketed columns
+ // For sorted columns, also get the order (ascending/descending) - that should
+ // also match for this to be converted to a map-only job.
List<Integer> bucketPositions =
getBucketPositions(destTable.getBucketCols(), destTable.getCols());
- List<ObjectPair<Integer, Integer>> sortPositions =
- getSortPositions(destTable.getSortCols(), destTable.getCols());
+ ObjectPair<List<Integer>, List<Integer>> sortOrderPositions =
+ getSortPositionsOrder(destTable.getSortCols(), destTable.getCols());
+ List<Integer> sortPositions = sortOrderPositions.getFirst();
+ List<Integer> sortOrder = sortOrderPositions.getSecond();
+ boolean useBucketSortPositions = true;
// Only selects and filters are allowed
Operator<? extends OperatorDesc> op = rsOp;
@@ -298,119 +408,179 @@ public class BucketingSortingReduceSinkO
// bucketed/sorted columns for the destination table
List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+ op = op.getParentOperators().get(0);
while (true) {
- if (op.getParentOperators().size() > 1) {
- return null;
- }
-
- op = op.getParentOperators().get(0);
if (!(op instanceof TableScanOperator) &&
!(op instanceof FilterOperator) &&
- !(op instanceof SelectOperator)) {
+ !(op instanceof SelectOperator) &&
+ !(op instanceof SMBMapJoinOperator)) {
return null;
}
- // nothing to be done for filters - the output schema does not change.
- if (op instanceof TableScanOperator) {
- Table srcTable = pGraphContext.getTopToTable().get(op);
-
- // Find the positions of the bucketed columns in the table corresponding
- // to the select list.
- // Consider the following scenario:
- // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
- // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
- // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
- // should be optimized.
-
- // Start with the destination: T2, bucketed/sorted position is [1]
- // At the source T1, the column corresponding to that position is [key], which
- // maps to column [0] of T1, which is also bucketed/sorted into the same
- // number of buckets
- List<Integer> newBucketPositions = new ArrayList<Integer>();
- for (int pos = 0; pos < bucketPositions.size(); pos++) {
- ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
- String colName = col.getColumn();
- int bucketPos = findColumnPosition(srcTable.getCols(), colName);
- if (bucketPos < 0) {
- return null;
- }
- newBucketPositions.add(bucketPos);
+ if (op instanceof SMBMapJoinOperator) {
+ // Bucketing and sorting keys should exactly match
+ if (!(bucketPositions.equals(sortPositions))) {
+ return null;
+ }
+ SMBMapJoinOperator smbOp = (SMBMapJoinOperator) op;
+ SMBJoinDesc smbJoinDesc = smbOp.getConf();
+ int posBigTable = smbJoinDesc.getPosBigTable();
+
+ // join keys dont match the bucketing keys
+ List<ExprNodeDesc> keysBigTable = smbJoinDesc.getKeys().get((byte) posBigTable);
+ if (keysBigTable.size() != bucketPositions.size()) {
+ return null;
}
- // Find the positions/order of the sorted columns in the table corresponding
- // to the select list.
- List<ObjectPair<Integer, Integer>> newSortPositions =
- new ArrayList<ObjectPair<Integer, Integer>>();
- for (int pos = 0; pos < sortPositions.size(); pos++) {
- ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
- String colName = col.getColumn();
- int sortPos = findColumnPosition(srcTable.getCols(), colName);
- if (sortPos < 0) {
- return null;
- }
- newSortPositions.add(
- new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+ if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols,
+ sourceTableSortCols, sortOrder)) {
+ return null;
}
+ sourceTableBucketCols.clear();
+ sourceTableSortCols.clear();
+ useBucketSortPositions = false;
- if (srcTable.isPartitioned()) {
- PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
- List<Partition> partitions = prunedParts.getNotDeniedPartns();
-
- // Support for dynamic partitions can be added later
- // The following is not optimized:
- // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
- // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
- if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+ for (ExprNodeDesc keyBigTable : keysBigTable) {
+ if (!(keyBigTable instanceof ExprNodeColumnDesc)) {
return null;
}
- for (Partition partition : partitions) {
- if (!checkPartition(partition, newBucketPositions, newSortPositions,
- pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+ sourceTableBucketCols.add((ExprNodeColumnDesc) keyBigTable);
+ sourceTableSortCols.add((ExprNodeColumnDesc) keyBigTable);
+ }
+
+ // since it is a sort-merge join, only follow the big table
+ op = op.getParentOperators().get(posBigTable);
+ } else {
+ // nothing to be done for filters - the output schema does not change.
+ if (op instanceof TableScanOperator) {
+ assert !useBucketSortPositions;
+ Table srcTable = pGraphContext.getTopToTable().get(op);
+
+ // Find the positions of the bucketed columns in the table corresponding
+ // to the select list.
+ // Consider the following scenario:
+ // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+ // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+ // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+ // should be optimized.
+
+ // Start with the destination: T2, bucketed/sorted position is [1]
+ // At the source T1, the column corresponding to that position is [key], which
+ // maps to column [0] of T1, which is also bucketed/sorted into the same
+ // number of buckets
+ List<Integer> newBucketPositions = new ArrayList<Integer>();
+ for (int pos = 0; pos < bucketPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+ String colName = col.getColumn();
+ int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+ if (bucketPos < 0) {
return null;
}
+ newBucketPositions.add(bucketPos);
}
- removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
- partitions.get(0).getSortedPaths());
- return null;
- }
- else {
- if (!checkTable(srcTable, newBucketPositions, newSortPositions,
- pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
- return null;
+ // Find the positions/order of the sorted columns in the table corresponding
+ // to the select list.
+ List<Integer> newSortPositions = new ArrayList<Integer>();
+ for (int pos = 0; pos < sortPositions.size(); pos++) {
+ ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+ String colName = col.getColumn();
+ int sortPos = findColumnPosition(srcTable.getCols(), colName);
+ if (sortPos < 0) {
+ return null;
+ }
+ newSortPositions.add(sortPos);
}
- removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
- return null;
- }
- }
- // None of the operators is changing the positions
- else if (op instanceof SelectOperator) {
- SelectOperator selectOp = (SelectOperator) op;
- SelectDesc selectDesc = selectOp.getConf();
+ if (srcTable.isPartitioned()) {
+ PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+ List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+ // Support for dynamic partitions can be added later
+ // The following is not optimized:
+ // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+ // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+ if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+ return null;
+ }
+ for (Partition partition : partitions) {
+ if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder,
+ numBucketsDestination)) {
+ return null;
+ }
+ }
- // There may be multiple selects - chose the one closest to the table
- sourceTableBucketCols.clear();
- sourceTableSortCols.clear();
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+ partitions.get(0).getSortedPaths());
+ return null;
+ }
+ else {
+ if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder,
+ numBucketsDestination)) {
+ return null;
+ }
- // Only columns can be selected for both sorted and bucketed positions
- for (int pos : bucketPositions) {
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
- if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
return null;
}
- sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
}
+ // None of the operators is changing the positions
+ else if (op instanceof SelectOperator) {
+ SelectOperator selectOp = (SelectOperator) op;
+ SelectDesc selectDesc = selectOp.getConf();
+
+ // Iterate backwards, from the destination table to the top of the tree
+ // Based on the output column names, get the new columns.
+ if (!useBucketSortPositions) {
+ bucketPositions.clear();
+ sortPositions.clear();
+ List<String> outputColumnNames = selectDesc.getOutputColumnNames();
+
+ for (ExprNodeColumnDesc col : sourceTableBucketCols) {
+ String colName = col.getColumn();
+ int colPos = outputColumnNames.indexOf(colName);
+ if (colPos < 0) {
+ return null;
+ }
+ bucketPositions.add(colPos);
+ }
- for (ObjectPair<Integer, Integer> pos : sortPositions) {
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
- if (!(selectColList instanceof ExprNodeColumnDesc)) {
- return null;
+ for (ExprNodeColumnDesc col : sourceTableSortCols) {
+ String colName = col.getColumn();
+ int colPos = outputColumnNames.indexOf(colName);
+ if (colPos < 0) {
+ return null;
+ }
+ sortPositions.add(colPos);
+ }
+ }
+
+ // There may be multiple selects - chose the one closest to the table
+ sourceTableBucketCols.clear();
+ sourceTableSortCols.clear();
+
+ // Only columns can be selected for both sorted and bucketed positions
+ for (int pos : bucketPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+ }
+
+ for (int pos : sortPositions) {
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (!(selectColList instanceof ExprNodeColumnDesc)) {
+ return null;
+ }
+ sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
}
- sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+
+ useBucketSortPositions = false;
}
+ op = op.getParentOperators().get(0);
}
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Fri Apr 12 17:35:19 2013
@@ -1828,6 +1828,7 @@ body
:
insertClause
selectClause
+ lateralView?
whereClause?
groupByClause?
havingClause?
@@ -1836,11 +1837,12 @@ body
distributeByClause?
sortByClause?
window_clause?
- limitClause? -> ^(TOK_INSERT insertClause?
- selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+ limitClause? -> ^(TOK_INSERT insertClause
+ selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
distributeByClause? sortByClause? window_clause? limitClause?)
|
selectClause
+ lateralView?
whereClause?
groupByClause?
havingClause?
@@ -1850,7 +1852,7 @@ body
sortByClause?
window_clause?
limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
- selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+ selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
distributeByClause? sortByClause? window_clause? limitClause?)
;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Fri Apr 12 17:35:19 2013
@@ -90,6 +90,8 @@ public class QBParseInfo {
*/
private final HashMap<String, ArrayList<ASTNode>> aliasToLateralViews;
+ private final HashMap<String, ASTNode> destToLateralView;
+
/* Order by clause */
private final HashMap<String, ASTNode> destToOrderby;
private final HashMap<String, Integer> destToLimit;
@@ -111,6 +113,7 @@ public class QBParseInfo {
nameToDest = new HashMap<String, ASTNode>();
nameToSample = new HashMap<String, TableSample>();
exprToColumnAlias = new HashMap<ASTNode, String>();
+ destToLateralView = new HashMap<String, ASTNode>();
destToSelExpr = new LinkedHashMap<String, ASTNode>();
destToWhereExpr = new HashMap<String, ASTNode>();
destToGroupby = new HashMap<String, ASTNode>();
@@ -552,6 +555,9 @@ public class QBParseInfo {
return nameToSample;
}
+ public HashMap<String, ASTNode> getDestToLateralView() {
+ return destToLateralView;
+ }
protected static enum ClauseType {
CLUSTER_BY_CLAUSE,
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Apr 12 17:35:19 2013
@@ -1023,7 +1023,13 @@ public class SemanticAnalyzer extends Ba
.getMsg(partition.toString()));
}
}
-
+ skipRecursion = false;
+ break;
+ case HiveParser.TOK_LATERAL_VIEW:
+ // todo: nested LV
+ assert ast.getChildCount() == 1;
+ qb.getParseInfo().getDestToLateralView().put(ctx_1.dest, ast);
+ break;
default:
skipRecursion = false;
break;
@@ -3989,7 +3995,7 @@ public class SemanticAnalyzer extends Ba
}
@SuppressWarnings({"nls"})
- private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+ private Operator genGroupByPlan1ReduceMultiGBY(List<String> dests, QB qb, Operator input)
throws SemanticException {
QBParseInfo parseInfo = qb.getParseInfo();
@@ -6811,9 +6817,14 @@ public class SemanticAnalyzer extends Ba
// Return the common distinct expression
// There should be more than 1 destination, with group bys in all of them.
private List<ASTNode> getCommonDistinctExprs(QB qb, Operator input) {
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBParseInfo qbp = qb.getParseInfo();
+ // If a grouping set aggregation is present, common processing is not possible
+ if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()
+ || !qbp.getDestToLateralView().isEmpty()) {
+ return null;
+ }
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
TreeSet<String> ks = new TreeSet<String>();
ks.addAll(qbp.getClauseNames());
@@ -6822,15 +6833,10 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
+ List<ExprNodeDesc> oldList = null;
List<ASTNode> oldASTList = null;
for (String dest : ks) {
- // If a grouping set aggregation is present, common processing is not possible
- if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) {
- return null;
- }
-
// If a filter is present, common processing is not possible
if (qbp.getWhrForClause(dest) != null) {
return null;
@@ -6847,7 +6853,7 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+ List<ExprNodeDesc> currDestList;
try {
currDestList = getDistinctExprs(qbp, dest, inputRR);
} catch (SemanticException e) {
@@ -6968,10 +6974,9 @@ public class SemanticAnalyzer extends Ba
// Groups the clause names into lists so that any two clauses in the same list has the same
// group by and distinct keys and no clause appears in more than one list. Returns a list of the
// lists of clauses.
- private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
- throws SemanticException {
+ private List<List<String>> getCommonGroupByDestGroups(QB qb,
+ Map<String, Operator<? extends OperatorDesc>> inputs) throws SemanticException {
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBParseInfo qbp = qb.getParseInfo();
TreeSet<String> ks = new TreeSet<String>();
@@ -6989,29 +6994,31 @@ public class SemanticAnalyzer extends Ba
return commonGroupByDestGroups;
}
- List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
- new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
+ List<Operator<? extends OperatorDesc>> inputOperators =
+ new ArrayList<Operator<? extends OperatorDesc>>(ks.size());
+ List<List<ExprNodeDesc>> sprayKeyLists = new ArrayList<List<ExprNodeDesc>>(ks.size());
// Iterate over each clause
for (String dest : ks) {
-
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
- getDistinctExprs(qbp, dest, inputRR);
+ Operator input = inputs.get(dest);
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+ List<ExprNodeDesc> sprayKeys = getDistinctExprs(qbp, dest, inputRR);
// Add the group by expressions
List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
for (ASTNode grpByExpr : grpByExprs) {
- ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
- new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
- if (!sprayKeys.contains(grpByExprWrapper)) {
- sprayKeys.add(grpByExprWrapper);
+ ExprNodeDesc exprDesc = genExprNodeDesc(grpByExpr, inputRR);
+ if (ExprNodeDescUtils.indexOf(exprDesc, sprayKeys) < 0) {
+ sprayKeys.add(exprDesc);
}
}
// Loop through each of the lists of exprs, looking for a match
boolean found = false;
for (int i = 0; i < sprayKeyLists.size(); i++) {
-
+ if (!input.equals(inputOperators.get(i))) {
+ continue;
+ }
if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
continue;
}
@@ -7024,6 +7031,7 @@ public class SemanticAnalyzer extends Ba
// No match was found, so create new entries
if (!found) {
+ inputOperators.add(input);
sprayKeyLists.add(sprayKeys);
List<String> destGroup = new ArrayList<String>();
destGroup.add(dest);
@@ -7035,15 +7043,13 @@ public class SemanticAnalyzer extends Ba
}
// Returns whether or not two lists contain the same elements independent of order
- private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
+ private boolean matchExprLists(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
if (list1.size() != list2.size()) {
return false;
}
-
- for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
- if (!list2.contains(exprNodeDesc)) {
+ for (ExprNodeDesc exprNodeDesc : list1) {
+ if (ExprNodeDescUtils.indexOf(exprNodeDesc, list2) < 0) {
return false;
}
}
@@ -7051,23 +7057,20 @@ public class SemanticAnalyzer extends Ba
return true;
}
- // Returns a list of the distinct exprs for a given clause name as
- // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
- private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
- getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+ // Returns a list of the distinct exprs without duplicates for a given clause name
+ private List<ExprNodeDesc> getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR)
+ throws SemanticException {
List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
- new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+ List<ExprNodeDesc> distinctExprs = new ArrayList<ExprNodeDesc>();
for (ASTNode distinctAggExpr : distinctAggExprs) {
// 0 is function name
for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
- ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
- new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
- if (!distinctExprs.contains(distinctExpr)) {
- distinctExprs.add(distinctExpr);
+ ExprNodeDesc expr = genExprNodeDesc(parameter, inputRR);
+ if (ExprNodeDescUtils.indexOf(expr, distinctExprs) < 0) {
+ distinctExprs.add(expr);
}
}
}
@@ -7096,6 +7099,7 @@ public class SemanticAnalyzer extends Ba
QBParseInfo qbp = qb.getParseInfo();
TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
+ Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);
// For multi-group by with the same distinct, we ignore all user hints
// currently. It doesnt matter whether he has asked to do
// map-side aggregation or not. Map side aggregation is turned off
@@ -7148,7 +7152,7 @@ public class SemanticAnalyzer extends Ba
// expressions, otherwise treat all the expressions as a single group
if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
try {
- commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+ commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
} catch (SemanticException e) {
LOG.error("Failed to group clauses by common spray keys.", e);
}
@@ -7168,6 +7172,8 @@ public class SemanticAnalyzer extends Ba
}
String firstDest = commonGroupByDestGroup.get(0);
+ input = inputs.get(firstDest);
+
// Constructs a standard group by plan if:
// There is no other subquery with the same group by/distinct keys or
// (There are no aggregations in a representative query for the group and
@@ -7182,7 +7188,7 @@ public class SemanticAnalyzer extends Ba
// Go over all the destination tables
for (String dest : commonGroupByDestGroup) {
- curr = input;
+ curr = inputs.get(dest);
if (qbp.getWhrForClause(dest) != null) {
curr = genFilterPlan(dest, qb, curr);
@@ -7215,7 +7221,7 @@ public class SemanticAnalyzer extends Ba
curr = genPostGroupByBodyPlan(curr, dest, qb);
}
} else {
- curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
+ curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input);
}
}
}
@@ -7228,6 +7234,16 @@ public class SemanticAnalyzer extends Ba
return curr;
}
+ private Map<String, Operator<? extends OperatorDesc>> createInputForDests(QB qb,
+ Operator<? extends OperatorDesc> input, Set<String> dests) throws SemanticException {
+ Map<String, Operator<? extends OperatorDesc>> inputs =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+ for (String dest : dests) {
+ inputs.put(dest, genLateralViewPlanForDest(dest, qb, input));
+ }
+ return inputs;
+ }
+
private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
throws SemanticException {
@@ -8037,71 +8053,7 @@ public class SemanticAnalyzer extends Ba
// -> LateralViewJoinOperator
//
- RowResolver lvForwardRR = new RowResolver();
- RowResolver source = opParseCtx.get(op).getRowResolver();
- for (ColumnInfo col : source.getColumnInfos()) {
- if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
- continue;
- }
- String[] tabCol = source.reverseLookup(col.getInternalName());
- lvForwardRR.put(tabCol[0], tabCol[1], col);
- }
-
- Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
- op), lvForwardRR);
-
- // The order in which the two paths are added is important. The
- // lateral view join operator depends on having the select operator
- // give it the row first.
-
- // Get the all path by making a select(*).
- RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
- // Operator allPath = op;
- Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
- lvForward), allPathRR);
- // Get the UDTF Path
- QB blankQb = new QB(null, null, false);
- Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
- .getChild(0), blankQb, lvForward);
- // add udtf aliases to QB
- for (String udtfAlias : blankQb.getAliases()) {
- qb.addAlias(udtfAlias);
- }
- RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
-
- // Merge the two into the lateral view join
- // The cols of the merged result will be the combination of both the
- // cols of the UDTF path and the cols of the all path. The internal
- // names have to be changed to avoid conflicts
-
- RowResolver lateralViewRR = new RowResolver();
- ArrayList<String> outputInternalColNames = new ArrayList<String>();
-
- LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
- LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
-
- // For PPD, we need a column to expression map so that during the walk,
- // the processor knows how to transform the internal col names.
- // Following steps are dependant on the fact that we called
- // LVmerge.. in the above order
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-
- int i = 0;
- for (ColumnInfo c : allPathRR.getColumnInfos()) {
- String internalName = getColumnInternalName(i);
- i++;
- colExprMap.put(internalName,
- new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
- c.getTabAlias(), c.getIsVirtualCol()));
- }
-
- Operator lateralViewJoin = putOpInsertMap(OperatorFactory
- .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
- new RowSchema(lateralViewRR.getColumnInfos()), allPath,
- udtfPath), lateralViewRR);
- lateralViewJoin.setColumnExprMap(colExprMap);
+ Operator lateralViewJoin = genLateralViewPlan(qb, op, lateralViewTree);
op = lateralViewJoin;
}
e.setValue(op);
@@ -8109,6 +8061,85 @@ public class SemanticAnalyzer extends Ba
}
}
+ private Operator genLateralViewPlanForDest(String dest, QB qb, Operator op)
+ throws SemanticException {
+ ASTNode lateralViewTree = qb.getParseInfo().getDestToLateralView().get(dest);
+ if (lateralViewTree != null) {
+ return genLateralViewPlan(qb, op, lateralViewTree);
+ }
+ return op;
+ }
+
+ private Operator genLateralViewPlan(QB qb, Operator op, ASTNode lateralViewTree)
+ throws SemanticException {
+ RowResolver lvForwardRR = new RowResolver();
+ RowResolver source = opParseCtx.get(op).getRowResolver();
+ for (ColumnInfo col : source.getColumnInfos()) {
+ if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
+ continue;
+ }
+ String[] tabCol = source.reverseLookup(col.getInternalName());
+ lvForwardRR.put(tabCol[0], tabCol[1], col);
+ }
+
+ Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
+ op), lvForwardRR);
+
+ // The order in which the two paths are added is important. The
+ // lateral view join operator depends on having the select operator
+ // give it the row first.
+
+ // Get the all path by making a select(*).
+ RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
+ // Operator allPath = op;
+ Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
+ lvForward), allPathRR);
+ // Get the UDTF Path
+ QB blankQb = new QB(null, null, false);
+ Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
+ .getChild(0), blankQb, lvForward);
+ // add udtf aliases to QB
+ for (String udtfAlias : blankQb.getAliases()) {
+ qb.addAlias(udtfAlias);
+ }
+ RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
+
+ // Merge the two into the lateral view join
+ // The cols of the merged result will be the combination of both the
+ // cols of the UDTF path and the cols of the all path. The internal
+ // names have to be changed to avoid conflicts
+
+ RowResolver lateralViewRR = new RowResolver();
+ ArrayList<String> outputInternalColNames = new ArrayList<String>();
+
+ LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
+ LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
+
+ // For PPD, we need a column to expression map so that during the walk,
+ // the processor knows how to transform the internal col names.
+ // Following steps are dependant on the fact that we called
+ // LVmerge.. in the above order
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+ int i = 0;
+ for (ColumnInfo c : allPathRR.getColumnInfos()) {
+ String internalName = getColumnInternalName(i);
+ i++;
+ colExprMap.put(internalName,
+ new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
+ c.getTabAlias(), c.getIsVirtualCol()));
+ }
+
+ Operator lateralViewJoin = putOpInsertMap(OperatorFactory
+ .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
+ new RowSchema(lateralViewRR.getColumnInfos()), allPath,
+ udtfPath), lateralViewRR);
+ lateralViewJoin.setColumnExprMap(colExprMap);
+ return lateralViewJoin;
+ }
+
/**
* A helper function that gets all the columns and respective aliases in the
* source and puts them into dest. It renames the internal names of the
@@ -8704,7 +8735,7 @@ public class SemanticAnalyzer extends Ba
}
if (LOG.isDebugEnabled()) {
- LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+ LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
Optimizer optm = new Optimizer();
@@ -8713,7 +8744,7 @@ public class SemanticAnalyzer extends Ba
pCtx = optm.optimize();
if (LOG.isDebugEnabled()) {
- LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+ LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
// Generate column access stats if required - wait until column pruning takes place