You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/06/24 08:32:31 UTC
svn commit: r1605013 [2/3] - in /hive/branches/cbo: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/...
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HiveSwapJoinRule.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HiveSwapJoinRule.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HiveSwapJoinRule.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/HiveSwapJoinRule.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,22 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.rules;
+
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveJoinRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveJoinRel.JoinAlgorithm;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveProjectRel;
+
+import org.eigenbase.rel.rules.SwapJoinRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+
+public class HiveSwapJoinRule extends SwapJoinRule {
+ public static final HiveSwapJoinRule INSTANCE = new HiveSwapJoinRule();
+
+ private HiveSwapJoinRule() {
+ super(HiveJoinRel.class, HiveProjectRel.DEFAULT_PROJECT_FACTORY);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ return super.matches(call)
+ && call.<HiveJoinRel> rel(0).getJoinAlgorithm() == JoinAlgorithm.NONE;
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/CBOTableStatsValidator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/CBOTableStatsValidator.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/CBOTableStatsValidator.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/CBOTableStatsValidator.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,90 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+
+import com.google.common.collect.ImmutableMap;
+
+public class CBOTableStatsValidator {
+ private final CBOValidateStatsContext m_ctx = new CBOValidateStatsContext();
+
+ public boolean validStats(Operator<? extends OperatorDesc> sinkOp, ParseContext pCtx) {
+ Map<Rule, NodeProcessor> rules = ImmutableMap
+ .<Rule, NodeProcessor> builder()
+ .put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+ new TableScanProcessor()).build();
+
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), rules, m_ctx);
+ GraphWalker fWalker = new ForwardWalker(disp);
+
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+
+ try {
+ fWalker.startWalking(topNodes, null);
+ } catch (SemanticException e) {
+ throw new RuntimeException(e);
+ }
+
+ return (m_ctx.m_tabsWithIncompleteStats.isEmpty());
+ }
+
+ public String getIncompleteStatsTabNames() {
+ StringBuilder sb = new StringBuilder();
+ for (String tabName : m_ctx.m_tabsWithIncompleteStats) {
+ if (sb.length() > 1)
+ sb.append(", ");
+ sb.append(tabName);
+ }
+ return sb.toString();
+ }
+
+ private static NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) {
+ return null;
+ // TODO: Shouldn't we throw exception? as this would imply we got an op
+ // tree with no TS
+ }
+ };
+ }
+
+ static class TableScanProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) {
+ TableScanOperator tableScanOp = (TableScanOperator) nd;
+ Statistics stats = tableScanOp.getStatistics();
+ int noColsWithStats = (stats != null && stats.getColumnStats() != null) ? stats
+ .getColumnStats().size() : 0;
+ if (noColsWithStats != tableScanOp.getNeededColumns().size()) {
+ ((CBOValidateStatsContext) procCtx).m_tabsWithIncompleteStats.add(tableScanOp.getConf()
+ .getAlias());
+ }
+ return null;
+ }
+ }
+
+ static class CBOValidateStatsContext implements NodeProcessorCtx {
+ final private HashSet<String> m_tabsWithIncompleteStats = new HashSet<String>();
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/FilterSelectivityEstimator.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,181 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.relopt.RelOptUtil.InputReferencedVisitor;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexVisitorImpl;
+import org.eigenbase.sql.SqlKind;
+
+public class FilterSelectivityEstimator extends RexVisitorImpl<Double> {
+ private final RelNode m_childRel;
+ private final double m_childCardinality;
+
+ protected FilterSelectivityEstimator(RelNode childRel) {
+ super(true);
+ m_childRel = childRel;
+ m_childCardinality = RelMetadataQuery.getRowCount(m_childRel);
+ }
+
+ public Double estimateSelectivity(RexNode predicate) {
+ return predicate.accept(this);
+ }
+
+ public Double visitCall(RexCall call) {
+ if (!deep) {
+ return 1.0;
+ }
+
+ Double selectivity = null;
+ SqlKind op = call.getKind();
+
+ switch (op) {
+ case AND: {
+ selectivity = computeConjunctionSelectivity(call);
+ break;
+ }
+
+ case OR: {
+ selectivity = computeDisjunctionSelectivity(call);
+ break;
+ }
+
+ case NOT_EQUALS: {
+ selectivity = computeNotEqualitySelectivity(call);
+ }
+
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN_OR_EQUAL:
+ case LESS_THAN:
+ case GREATER_THAN: {
+ selectivity = ((double) 1 / (double) 3);
+ break;
+ }
+
+ case IN: {
+ selectivity = ((double) 1 / ((double) call.operands.size()));
+ break;
+ }
+
+ default:
+ selectivity = computeFunctionSelectivity(call);
+ }
+
+ return selectivity;
+ }
+
+ /**
+ * NDV of "f1(x, y, z) != f2(p, q, r)" ->
+ * "(maxNDV(x,y,z,p,q,r) - 1)/maxNDV(x,y,z,p,q,r)".
+ * <p>
+ *
+ * @param call
+ * @return
+ */
+ private Double computeNotEqualitySelectivity(RexCall call) {
+ double tmpNDV = getMaxNDV(call);
+
+ if (tmpNDV > 1)
+ return (tmpNDV - (double) 1) / tmpNDV;
+ else
+ return 1.0;
+ }
+
+ /**
+ * Selectivity of f(X,y,z) -> 1/maxNDV(x,y,z).
+ * <p>
+ * Note that >, >=, <, <=, = ... are considered generic functions and uses
+ * this method to find their selectivity.
+ *
+ * @param call
+ * @return
+ */
+ private Double computeFunctionSelectivity(RexCall call) {
+ return 1 / getMaxNDV(call);
+ }
+
+ /**
+ * Disjunction Selectivity -> (1 Ð(1-m1/n)(1-m2/n)) where n is the total
+ * number of tuples from child and m1 and m2 is the expected number of tuples
+ * from each part of the disjunction predicate.
+ * <p>
+ * Note we compute m1. m2.. by applying selectivity of the disjunctive element
+ * on the cardinality from child.
+ *
+ * @param call
+ * @return
+ */
+ private Double computeDisjunctionSelectivity(RexCall call) {
+ Double tmpCardinality;
+ Double tmpSelectivity;
+ double selectivity = 1;
+
+ for (RexNode dje : call.getOperands()) {
+ tmpSelectivity = dje.accept(this);
+ if (tmpSelectivity == null) {
+ tmpSelectivity = 0.99;
+ }
+ tmpCardinality = m_childCardinality * tmpSelectivity;
+
+ if (tmpCardinality > 1)
+ tmpSelectivity = (1 - tmpCardinality / m_childCardinality);
+ else
+ tmpSelectivity = 1.0;
+
+ selectivity *= tmpSelectivity;
+ }
+
+ if (selectivity > 1)
+ return (1 - selectivity);
+ else
+ return 1.0;
+ }
+
+ /**
+ * Selectivity of conjunctive predicate -> (selectivity of conjunctive
+ * element1) * (selectivity of conjunctive element2)...
+ *
+ * @param call
+ * @return
+ */
+ private Double computeConjunctionSelectivity(RexCall call) {
+ Double tmpSelectivity;
+ double selectivity = 1;
+
+ for (RexNode cje : call.getOperands()) {
+ tmpSelectivity = cje.accept(this);
+ if (tmpSelectivity != null) {
+ selectivity *= tmpSelectivity;
+ }
+ }
+
+ return selectivity;
+ }
+
+ private Double getMaxNDV(RexCall call) {
+ double tmpNDV;
+ double maxNDV = 1.0;
+ InputReferencedVisitor irv;
+
+ for (RexNode op : call.getOperands()) {
+ if (op instanceof RexInputRef) {
+ tmpNDV = HiveRelMdDistinctRowCount.getDistinctRowCount(m_childRel,
+ ((RexInputRef) op).getIndex());
+ if (tmpNDV > maxNDV)
+ maxNDV = tmpNDV;
+ } else {
+ irv = new InputReferencedVisitor();
+ irv.apply(op);
+ for (Integer childProjIndx : irv.inputPosReferenced) {
+ tmpNDV = HiveRelMdDistinctRowCount.getDistinctRowCount(m_childRel, childProjIndx);
+ if (tmpNDV > maxNDV)
+ maxNDV = tmpNDV;
+ }
+ }
+ }
+
+ return maxNDV;
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdDistinctRowCount.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdDistinctRowCount.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdDistinctRowCount.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdDistinctRowCount.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,62 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
+
+import java.util.BitSet;
+import java.util.List;
+
+import net.hydromatic.optiq.BuiltinMethod;
+
+import org.apache.hadoop.hive.ql.optimizer.optiq.HiveOptiqUtil;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.ReflectiveRelMetadataProvider;
+import org.eigenbase.rel.metadata.RelMdDistinctRowCount;
+import org.eigenbase.rel.metadata.RelMdUtil;
+import org.eigenbase.rel.metadata.RelMetadataProvider;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.util14.NumberUtil;
+
+public class HiveRelMdDistinctRowCount extends RelMdDistinctRowCount {
+ public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltinMethod.DISTINCT_ROW_COUNT.method,
+ new HiveRelMdDistinctRowCount());
+
+ private HiveRelMdDistinctRowCount() {
+ }
+
+ // Catch-all rule when none of the others apply.
+ @Override
+ public Double getDistinctRowCount(RelNode rel, BitSet groupKey, RexNode predicate) {
+ if (rel instanceof HiveTableScanRel) {
+ return getDistinctRowCount((HiveTableScanRel) rel, groupKey, predicate);
+ }
+
+ return NumberUtil.multiply(RelMetadataQuery.getRowCount(rel),
+ RelMetadataQuery.getSelectivity(rel, predicate));
+ }
+
+ private Double getDistinctRowCount(HiveTableScanRel htRel, BitSet groupKey, RexNode predicate) {
+ List<Integer> projIndxLst = HiveOptiqUtil.translateBitSetToProjIndx(groupKey);
+ List<ColStatistics> colStats = htRel.getColStat(projIndxLst);
+ Double noDistinctRows = 1.0;
+ for (ColStatistics cStat : colStats) {
+ noDistinctRows *= cStat.getCountDistint();
+ }
+
+ return Math.min(noDistinctRows, htRel.getRows());
+ }
+
+ public static Double getDistinctRowCount(RelNode r, int indx) {
+ BitSet bitSetOfRqdProj = new BitSet();
+ bitSetOfRqdProj.set(indx);
+ return RelMetadataQuery.getDistinctRowCount(r, bitSetOfRqdProj, r.getCluster().getRexBuilder()
+ .makeLiteral(true));
+ }
+
+ @Override
+ public Double getDistinctRowCount(JoinRelBase rel, BitSet groupKey, RexNode predicate) {
+ return RelMdUtil.getJoinDistinctRowCount(rel, rel.getJoinType(), groupKey, predicate, true);
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdSelectivity.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdSelectivity.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdSelectivity.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/stats/HiveRelMdSelectivity.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,178 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.stats;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.hydromatic.optiq.BuiltinMethod;
+
+import org.apache.hadoop.hive.ql.optimizer.optiq.JoinUtil.JoinLeafPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.optiq.JoinUtil.JoinPredicateInfo;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveJoinRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.metadata.ReflectiveRelMetadataProvider;
+import org.eigenbase.rel.metadata.RelMdSelectivity;
+import org.eigenbase.rel.metadata.RelMdUtil;
+import org.eigenbase.rel.metadata.RelMetadataProvider;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+public class HiveRelMdSelectivity extends RelMdSelectivity {
+ public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltinMethod.SELECTIVITY.method,
+ new HiveRelMdSelectivity());
+
+ protected HiveRelMdSelectivity() {
+ super();
+ }
+
+ public Double getSelectivity(HiveTableScanRel t, RexNode predicate) {
+ if (predicate != null) {
+ FilterSelectivityEstimator filterSelEstmator = new FilterSelectivityEstimator(t);
+ return filterSelEstmator.estimateSelectivity(predicate);
+ }
+
+ return 1.0;
+ }
+
+ public Double getSelectivity(HiveJoinRel j, RexNode predicate) {
+ if (j.getJoinType().equals(JoinRelType.INNER)) {
+ return computeInnerJoinSelectivity(j, predicate);
+ }
+ return 1.0;
+ }
+
+ private Double computeInnerJoinSelectivity(HiveJoinRel j, RexNode predicate) {
+ double ndvCrossProduct = 1;
+ RexNode combinedPredicate = getCombinedPredicateForJoin(j, predicate);
+ JoinPredicateInfo jpi = JoinPredicateInfo.constructJoinPredicateInfo(j, combinedPredicate);
+ ImmutableMap.Builder<Integer, Double> colStatMapBuilder = ImmutableMap.builder();
+ ImmutableMap<Integer, Double> colStatMap;
+ int rightOffSet = j.getLeft().getRowType().getFieldCount();
+
+ // 1. Update Col Stats Map with col stats for columns from left side of
+ // Join which are part of join keys
+ for (Integer ljk : jpi.getProjsFromLeftPartOfJoinKeysInChildSchema()) {
+ colStatMapBuilder.put(ljk, HiveRelMdDistinctRowCount.getDistinctRowCount(j.getLeft(), ljk));
+ }
+
+ // 2. Update Col Stats Map with col stats for columns from right side of
+ // Join which are part of join keys
+ for (Integer rjk : jpi.getProjsFromRightPartOfJoinKeysInChildSchema()) {
+ colStatMapBuilder.put(rjk + rightOffSet,
+ HiveRelMdDistinctRowCount.getDistinctRowCount(j.getRight(), rjk));
+ }
+ colStatMap = colStatMapBuilder.build();
+
+ // 3. Walk through the Join Condition Building NDV for selectivity
+ // NDV of the join can not exceed the cardinality of cross join.
+ List<JoinLeafPredicateInfo> peLst = jpi.getEquiJoinPredicateElements();
+ int noOfPE = peLst.size();
+ if (noOfPE > 0) {
+ // 3.1 Use first conjunctive predicate element's NDV as the seed
+ ndvCrossProduct = getMaxNDVForJoinSelectivity(peLst.get(0), colStatMap);
+
+ // 3.2 if conjunctive predicate elements are more than one, then walk
+ // through them one by one. Compute cross product of NDV. Cross product is
+ // computed by multiplying the largest NDV of all of the conjunctive
+ // predicate
+ // elements with degraded NDV of rest of the conjunctive predicate
+ // elements. NDV is
+ // degraded using log function.Finally the ndvCrossProduct is fenced at
+ // the join
+ // cross product to ensure that NDV can not exceed worst case join
+ // cardinality.<br>
+ // NDV of a conjunctive predicate element is the max NDV of all arguments
+ // to lhs, rhs expressions.
+ // NDV(JoinCondition) = min (left cardinality * right cardinality,
+ // ndvCrossProduct(JoinCondition))
+ // ndvCrossProduct(JoinCondition) = ndv(pex)*log(ndv(pe1))*log(ndv(pe2))
+ // where pex is the predicate element of join condition with max ndv.
+ // ndv(pe) = max(NDV(left.Expr), NDV(right.Expr))
+ // NDV(expr) = max(NDV( expr args))
+ if (noOfPE > 1) {
+ double maxNDVSoFar = ndvCrossProduct;
+ double ndvToBeSmoothed;
+ double tmpNDV;
+
+ for (int i = 1; i < noOfPE; i++) {
+ tmpNDV = getMaxNDVForJoinSelectivity(peLst.get(i), colStatMap);
+ if (tmpNDV > maxNDVSoFar) {
+ ndvToBeSmoothed = maxNDVSoFar;
+ maxNDVSoFar = tmpNDV;
+ ndvCrossProduct = (ndvCrossProduct / ndvToBeSmoothed) * tmpNDV;
+ } else {
+ ndvToBeSmoothed = tmpNDV;
+ }
+ // TODO: revisit the fence
+ if (ndvToBeSmoothed > 3)
+ ndvCrossProduct *= Math.log(ndvToBeSmoothed);
+ else
+ ndvCrossProduct *= ndvToBeSmoothed;
+ }
+
+ ndvCrossProduct = Math.min(
+ RelMetadataQuery.getRowCount(j.getLeft()) * RelMetadataQuery.getRowCount(j.getRight()),
+ ndvCrossProduct);
+ }
+ }
+
+ // 4. Join Selectivity = 1/NDV
+ return (1 / ndvCrossProduct);
+ }
+
+ private RexNode getCombinedPredicateForJoin(HiveJoinRel j, RexNode additionalPredicate) {
+ RexNode minusPred = RelMdUtil.minusPreds(j.getCluster().getRexBuilder(), additionalPredicate,
+ j.getCondition());
+
+ if (minusPred != null) {
+ List<RexNode> minusList = new ArrayList<RexNode>();
+ minusList.add(j.getCondition());
+ minusList.add(minusPred);
+
+ return RexUtil.composeConjunction(j.getCluster().getRexBuilder(), minusList, true);
+ }
+
+ return j.getCondition();
+ }
+
+ /**
+ * Compute Max NDV to determine Join Selectivity.
+ *
+ * @param jlpi
+ * @param colStatMap
+ * Immutable Map of Projection Index (in Join Schema) to Column Stat
+ * @param rightProjOffSet
+ * @return
+ */
+ private static Double getMaxNDVForJoinSelectivity(JoinLeafPredicateInfo jlpi,
+ ImmutableMap<Integer, Double> colStatMap) {
+ Double maxNDVSoFar = 1.0;
+
+ maxNDVSoFar = getMaxNDVFromProjections(colStatMap,
+ jlpi.getProjsFromLeftPartOfJoinKeysInJoinSchema(), maxNDVSoFar);
+ maxNDVSoFar = getMaxNDVFromProjections(colStatMap,
+ jlpi.getProjsFromRightPartOfJoinKeysInJoinSchema(), maxNDVSoFar);
+
+ return maxNDVSoFar;
+ }
+
+ private static Double getMaxNDVFromProjections(Map<Integer, Double> colStatMap,
+ Set<Integer> projectionSet, Double defaultMaxNDV) {
+ Double colNDV = null;
+ Double maxNDVSoFar = defaultMaxNDV;
+
+ for (Integer projIndx : projectionSet) {
+ colNDV = colStatMap.get(projIndx);
+ if (colNDV > maxNDVSoFar)
+ maxNDVSoFar = colNDV;
+ }
+
+ return maxNDVSoFar;
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTBuilder.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTBuilder.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTBuilder.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTBuilder.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,166 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
+
+import org.apache.hadoop.hive.ql.optimizer.optiq.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.sql.type.SqlTypeName;
+
+class ASTBuilder {
+
+ static ASTBuilder construct(int tokenType, String text) {
+ ASTBuilder b = new ASTBuilder();
+ b.curr = createAST(tokenType, text);
+ return b;
+ }
+
+ static ASTNode createAST(int tokenType, String text) {
+ return (ASTNode) ParseDriver.adaptor.create(tokenType, text);
+ }
+
+ static ASTNode destNode() {
+ return ASTBuilder
+ .construct(HiveParser.TOK_DESTINATION, "TOK_DESTINATION")
+ .add(
+ ASTBuilder.construct(HiveParser.TOK_DIR, "TOK_DIR").add(HiveParser.TOK_TMP_FILE,
+ "TOK_TMP_FILE")).node();
+ }
+
+ static ASTNode table(TableAccessRelBase scan) {
+ RelOptHiveTable hTbl = (RelOptHiveTable) scan.getTable();
+ ASTBuilder b = ASTBuilder
+ .construct(HiveParser.TOK_TABREF, "TOK_TABREF")
+ .add(
+ ASTBuilder.construct(HiveParser.TOK_TABNAME, "TOK_TABNAME")
+ .add(HiveParser.Identifier, hTbl.getHiveTableMD().getDbName())
+ .add(HiveParser.Identifier, hTbl.getHiveTableMD().getTableName()))
+ .add(HiveParser.Identifier, hTbl.getName());
+ return b.node();
+ }
+
+ static ASTNode join(ASTNode left, ASTNode right, JoinRelType joinType, ASTNode cond) {
+ ASTBuilder b = null;
+
+ switch (joinType) {
+ case INNER:
+ b = ASTBuilder.construct(HiveParser.TOK_JOIN, "TOK_JOIN");
+ break;
+ case LEFT:
+ b = ASTBuilder.construct(HiveParser.TOK_LEFTOUTERJOIN, "TOK_LEFTOUTERJOIN");
+ break;
+ case RIGHT:
+ b = ASTBuilder.construct(HiveParser.TOK_RIGHTOUTERJOIN, "TOK_RIGHTOUTERJOIN");
+ break;
+ case FULL:
+ b = ASTBuilder.construct(HiveParser.TOK_FULLOUTERJOIN, "TOK_FULLOUTERJOIN");
+ break;
+ }
+
+ b.add(left).add(right).add(cond);
+ return b.node();
+ }
+
+ static ASTNode subQuery(ASTNode qry, String alias) {
+ return ASTBuilder.construct(HiveParser.TOK_SUBQUERY, "TOK_SUBQUERY").add(qry)
+ .add(HiveParser.Identifier, alias).node();
+ }
+
+ static ASTNode qualifiedName(String tableName, String colName) {
+ ASTBuilder b = ASTBuilder
+ .construct(HiveParser.DOT, ".")
+ .add(
+ ASTBuilder.construct(HiveParser.TOK_TABLE_OR_COL, "TOK_TABLE_OR_COL").add(
+ HiveParser.Identifier, tableName)).add(HiveParser.Identifier, colName);
+ return b.node();
+ }
+
+ static ASTNode unqualifiedName(String colName) {
+ ASTBuilder b = ASTBuilder
+.construct(HiveParser.TOK_TABLE_OR_COL,
+ "TOK_TABLE_OR_COL").add(HiveParser.Identifier, colName);
+ return b.node();
+ }
+
+ static ASTNode where(ASTNode cond) {
+ return ASTBuilder.construct(HiveParser.TOK_WHERE, "TOK_WHERE").add(cond).node();
+ }
+
+ static ASTNode having(ASTNode cond) {
+ return ASTBuilder.construct(HiveParser.TOK_HAVING, "TOK_HAVING").add(cond).node();
+ }
+
+ static ASTNode limit(Object value) {
+ return ASTBuilder.construct(HiveParser.TOK_LIMIT, "TOK_LIMIT")
+ .add(HiveParser.Number, value.toString()).node();
+ }
+
+ static ASTNode selectExpr(ASTNode expr, String alias) {
+ return ASTBuilder.construct(HiveParser.TOK_SELEXPR, "TOK_SELEXPR").add(expr)
+ .add(HiveParser.Identifier, alias).node();
+ }
+
+ static ASTNode literal(RexLiteral literal) {
+ Object val = literal.getValue3();
+ int type = 0;
+ SqlTypeName sqlType = literal.getType().getSqlTypeName();
+
+ switch (sqlType) {
+ case TINYINT:
+ type = HiveParser.TinyintLiteral;
+ break;
+ case SMALLINT:
+ type = HiveParser.SmallintLiteral;
+ break;
+ case INTEGER:
+ case BIGINT:
+ type = HiveParser.BigintLiteral;
+ break;
+ case DECIMAL:
+ case FLOAT:
+ case DOUBLE:
+ case REAL:
+ type = HiveParser.Number;
+ break;
+ case VARCHAR:
+ case CHAR:
+ type = HiveParser.StringLiteral;
+ val = "'" + String.valueOf(val) + "'";
+ break;
+ case BOOLEAN:
+ type = ((Boolean) val).booleanValue() ? HiveParser.KW_TRUE
+ : HiveParser.KW_FALSE;
+ break;
+
+ default:
+ throw new RuntimeException("Unsupported Type: " + sqlType);
+ }
+
+ return (ASTNode) ParseDriver.adaptor.create(type, String.valueOf(val));
+ }
+
+ ASTNode curr;
+
+ ASTNode node() {
+ return curr;
+ }
+
+ ASTBuilder add(int tokenType, String text) {
+ ParseDriver.adaptor.addChild(curr, createAST(tokenType, text));
+ return this;
+ }
+
+ ASTBuilder add(ASTBuilder b) {
+ ParseDriver.adaptor.addChild(curr, b.curr);
+ return this;
+ }
+
+ ASTBuilder add(ASTNode n) {
+ if (n != null) {
+ ParseDriver.adaptor.addChild(curr, n);
+ }
+ return this;
+ }
+}
\ No newline at end of file
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTConverter.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTConverter.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ASTConverter.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,420 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveSortRel;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.FilterRelBase;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.RelVisitor;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
+import org.eigenbase.rex.RexVisitorImpl;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlOperator;
+import org.eigenbase.sql.type.BasicSqlType;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import com.google.common.collect.Iterables;
+
+public class ASTConverter {
+
+ RelNode root;
+ HiveAST hiveAST;
+ RelNode from;
+ FilterRelBase where;
+ AggregateRelBase groupBy;
+ FilterRelBase having;
+ ProjectRelBase select;
+ SortRel order;
+
+ Schema schema;
+
+ ASTConverter(RelNode root) {
+ this.root = root;
+ hiveAST = new HiveAST();
+ }
+
+ public static ASTNode convert(final RelNode relNode, List<FieldSchema> resultSchema) {
+ SortRel sortrel = null;
+ RelNode root = DerivedTableInjector.convertOpTree(relNode, resultSchema);
+
+ if (root instanceof SortRel) {
+ sortrel = (SortRel) root;
+ root = sortrel.getChild();
+ if (!(root instanceof ProjectRelBase))
+ throw new RuntimeException("Child of root sort node is not a project");
+ }
+
+ ASTConverter c = new ASTConverter(root);
+ return c.convert(sortrel);
+ }
+
+ public ASTNode convert(SortRel sortrel) {
+ /*
+ * 1. Walk RelNode Graph; note from, where, gBy.. nodes.
+ */
+ new QBVisitor().go(root);
+
+ /*
+ * 2. convert from node.
+ */
+ QueryBlockInfo qb = convertSource(from);
+ schema = qb.schema;
+ hiveAST.from = ASTBuilder.construct(HiveParser.TOK_FROM, "TOK_FROM").add(qb.ast).node();
+
+ /*
+ * 3. convert filterNode
+ */
+ if (where != null) {
+ ASTNode cond = where.getCondition().accept(new RexVisitor(schema));
+ hiveAST.where = ASTBuilder.where(cond);
+ }
+
+ /*
+ * 4. GBy
+ */
+ if (groupBy != null) {
+ ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_GROUPBY, "TOK_GROUPBY");
+ for (int i : BitSets.toIter(groupBy.getGroupSet())) {
+ RexInputRef iRef = new RexInputRef(i, new BasicSqlType(SqlTypeName.ANY));
+ b.add(iRef.accept(new RexVisitor(schema)));
+ }
+ hiveAST.groupBy = b.node();
+ schema = new Schema(schema, groupBy);
+ }
+
+ /*
+ * 5. Having
+ */
+ if (having != null) {
+ ASTNode cond = having.getCondition().accept(new RexVisitor(schema));
+ hiveAST.having = ASTBuilder.having(cond);
+ }
+
+ /*
+ * 6. Project
+ */
+ int i = 0;
+ ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_SELECT, "TOK_SELECT");
+
+ for (RexNode r : select.getChildExps()) {
+ ASTNode selectExpr = ASTBuilder.selectExpr(r.accept(new RexVisitor(schema)), select
+ .getRowType().getFieldNames().get(i++));
+ b.add(selectExpr);
+ }
+ hiveAST.select = b.node();
+
+ /*
+ * 7. Order
+ * Use in Order By from the block above. RelNode has no pointer to parent
+ * hence we need to go top down; but OB at each block really belong to its
+ * src/from. Hence the need to pass in sortRel for each block from its parent.
+ */
+ if (sortrel != null) {
+ HiveSortRel hiveSort = (HiveSortRel) sortrel;
+ if (!hiveSort.getCollation().getFieldCollations().isEmpty()) {
+ ASTNode orderAst = ASTBuilder.createAST(HiveParser.TOK_ORDERBY, "TOK_ORDERBY");
+ schema = new Schema((HiveSortRel) sortrel);
+ for (RelFieldCollation c : hiveSort.getCollation().getFieldCollations()) {
+ ColumnInfo cI = schema.get(c.getFieldIndex());
+ /*
+ * The RowResolver setup for Select drops Table associations. So setup
+ * ASTNode on unqualified name.
+ */
+ ASTNode astCol = ASTBuilder.unqualifiedName(cI.column);
+ ASTNode astNode = c.getDirection() == RelFieldCollation.Direction.ASCENDING
+ ? ASTBuilder.createAST(HiveParser.TOK_TABSORTCOLNAMEASC, "TOK_TABSORTCOLNAMEASC")
+ : ASTBuilder.createAST(HiveParser.TOK_TABSORTCOLNAMEDESC, "TOK_TABSORTCOLNAMEDESC");
+ astNode.addChild(astCol);
+ orderAst.addChild(astNode);
+ }
+ hiveAST.order = orderAst;
+ }
+ RexNode limitExpr = hiveSort.getFetchExpr();
+ if (limitExpr != null) {
+ Object val = ((RexLiteral) limitExpr).getValue2();
+ hiveAST.limit = ASTBuilder.limit(val);
+ }
+
+ }
+
+ return hiveAST.getAST();
+ }
+
+ private Schema getRowSchema(String tblAlias) {
+ return new Schema(select, tblAlias);
+ }
+
+ private QueryBlockInfo convertSource(RelNode r) {
+ Schema s;
+ ASTNode ast;
+
+ if (r instanceof TableAccessRelBase) {
+ TableAccessRelBase f = (TableAccessRelBase) r;
+ s = new Schema(f);
+ ast = ASTBuilder.table(f);
+ } else if (r instanceof JoinRelBase) {
+ JoinRelBase join = (JoinRelBase) r;
+ QueryBlockInfo left = convertSource(join.getLeft());
+ QueryBlockInfo right = convertSource(join.getRight());
+ s = new Schema(left.schema, right.schema);
+ ASTNode cond = join.getCondition().accept(new RexVisitor(s));
+ ast = ASTBuilder.join(left.ast, right.ast, join.getJoinType(), cond);
+ } else {
+ ASTConverter src = new ASTConverter(r);
+ ASTNode srcAST = src.convert(order);
+ String sqAlias = ASTConverter.nextAlias();
+ s = src.getRowSchema(sqAlias);
+ ast = ASTBuilder.subQuery(srcAST, sqAlias);
+ }
+ return new QueryBlockInfo(s, ast);
+ }
+
+ class QBVisitor extends RelVisitor {
+
+ public void handle(FilterRelBase filter) {
+ RelNode child = filter.getChild();
+ if (child instanceof AggregateRelBase) {
+ ASTConverter.this.having = filter;
+ } else {
+ ASTConverter.this.where = filter;
+ }
+ }
+
+ public void handle(ProjectRelBase project) {
+ if (ASTConverter.this.select == null) {
+ ASTConverter.this.select = project;
+ } else {
+ ASTConverter.this.from = project;
+ }
+ }
+
+ @Override
+ public void visit(RelNode node, int ordinal, RelNode parent) {
+
+ if (node instanceof TableAccessRelBase) {
+ ASTConverter.this.from = node;
+ } else if (node instanceof FilterRelBase) {
+ handle((FilterRelBase) node);
+ } else if (node instanceof ProjectRelBase) {
+ handle((ProjectRelBase) node);
+ } else if (node instanceof JoinRelBase) {
+ ASTConverter.this.from = node;
+ } else if (node instanceof AggregateRelBase) {
+ ASTConverter.this.groupBy = (AggregateRelBase) node;
+ } else if (node instanceof SortRel) {
+ ASTConverter.this.order = (SortRel) node;
+ }
+ /*
+ * once the source node is reached; stop traversal for this QB
+ */
+ if (ASTConverter.this.from == null) {
+ node.childrenAccept(this);
+ }
+ }
+
+ }
+
+ static class RexVisitor extends RexVisitorImpl<ASTNode> {
+
+ private final Schema schema;
+
+ protected RexVisitor(Schema schema) {
+ super(true);
+ this.schema = schema;
+ }
+
+ @Override
+ public ASTNode visitInputRef(RexInputRef inputRef) {
+ ColumnInfo cI = schema.get(inputRef.getIndex());
+ if (cI.agg != null) {
+ return (ASTNode) ParseDriver.adaptor.dupTree(cI.agg);
+ }
+ return ASTBuilder.qualifiedName(cI.table, cI.column);
+ }
+
+ @Override
+ public ASTNode visitLiteral(RexLiteral literal) {
+ return ASTBuilder.literal(literal);
+ }
+
+ @Override
+ public ASTNode visitCall(RexCall call) {
+ if (!deep) {
+ return null;
+ }
+
+ SqlOperator op = call.getOperator();
+ List<ASTNode> astNodeLst = new LinkedList<ASTNode>();
+ for (RexNode operand : call.operands) {
+ astNodeLst.add(operand.accept(this));
+ }
+ if (isFlat(call))
+ return SqlFunctionConverter.buildAST(op, astNodeLst, 0);
+ else
+ return SqlFunctionConverter.buildAST(op, astNodeLst);
+ }
+ }
+
+ static class QueryBlockInfo {
+ Schema schema;
+ ASTNode ast;
+
+ public QueryBlockInfo(Schema schema, ASTNode ast) {
+ super();
+ this.schema = schema;
+ this.ast = ast;
+ }
+ }
+
+ /*
+ * represents the schema exposed by a QueryBlock.
+ */
+ static class Schema extends ArrayList<ColumnInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ Schema(TableAccessRelBase scan) {
+ String tabName = scan.getTable().getQualifiedName().get(0);
+ for (RelDataTypeField field : scan.getRowType().getFieldList()) {
+ add(new ColumnInfo(tabName, field.getName()));
+ }
+ }
+
+ Schema(ProjectRelBase select, String alias) {
+ for (RelDataTypeField field : select.getRowType().getFieldList()) {
+ add(new ColumnInfo(alias, field.getName()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ Schema(Schema left, Schema right) {
+ for (ColumnInfo cI : Iterables.concat(left, right)) {
+ add(cI);
+ }
+ }
+
+ Schema(Schema src, AggregateRelBase gBy) {
+ for (int i : BitSets.toIter(gBy.getGroupSet())) {
+ ColumnInfo cI = src.get(i);
+ add(cI);
+ }
+ List<AggregateCall> aggs = gBy.getAggCallList();
+ for (AggregateCall agg : aggs) {
+ int argCount = agg.getArgList().size();
+ ASTBuilder b = agg.isDistinct() ? ASTBuilder.construct(HiveParser.TOK_FUNCTIONDI,
+ "TOK_FUNCTIONDI") : argCount == 0 ? ASTBuilder.construct(HiveParser.TOK_FUNCTIONSTAR,
+ "TOK_FUNCTIONSTAR") : ASTBuilder.construct(HiveParser.TOK_FUNCTION, "TOK_FUNCTION");
+ b.add(HiveParser.Identifier, agg.getAggregation().getName());
+ for (int i : agg.getArgList()) {
+ RexInputRef iRef = new RexInputRef(i, new BasicSqlType(SqlTypeName.ANY));
+ b.add(iRef.accept(new RexVisitor(src)));
+ }
+ add(new ColumnInfo(null, b.node()));
+ }
+ }
+
+ /**
+ * Assumption:<br>
+ * 1. ProjectRel will always be child of SortRel.<br>
+ * 2. In Optiq every projection in ProjectRelBase is uniquely named
+ * (unambigous) without using table qualifier (table name).<br>
+ *
+ * @param order
+ * Hive Sort Rel Node
+ * @return Schema
+ */
+ public Schema(HiveSortRel order) {
+ ProjectRelBase select = (ProjectRelBase) order.getChild();
+ for (String projName : select.getRowType().getFieldNames()) {
+ add(new ColumnInfo(null, projName));
+ }
+ }
+ }
+
+ /*
+ * represents Column information exposed by a QueryBlock.
+ */
+ static class ColumnInfo {
+ String table;
+ String column;
+ ASTNode agg;
+
+ ColumnInfo(String table, String column) {
+ super();
+ this.table = table;
+ this.column = column;
+ }
+
+ ColumnInfo(String table, ASTNode agg) {
+ super();
+ this.table = table;
+ this.agg = agg;
+ }
+
+ ColumnInfo(String alias, ColumnInfo srcCol) {
+ this.table = alias;
+ this.column = srcCol.column;
+ this.agg = srcCol.agg;
+ }
+ }
+
+ static String nextAlias() {
+ return String.format("$hdt$_%d", derivedTableCounter.getAndIncrement());
+ }
+
+ private static AtomicLong derivedTableCounter = new AtomicLong(0);
+
+ static class HiveAST {
+
+ ASTNode from;
+ ASTNode where;
+ ASTNode groupBy;
+ ASTNode having;
+ ASTNode select;
+ ASTNode order;
+ ASTNode limit;
+
+ public ASTNode getAST() {
+ ASTBuilder b = ASTBuilder
+ .construct(HiveParser.TOK_QUERY, "TOK_QUERY")
+ .add(from)
+ .add(
+ ASTBuilder.construct(HiveParser.TOK_INSERT, "TOK_INSERT").add(ASTBuilder.destNode())
+ .add(select).add(where).add(groupBy).add(having).add(order).add(limit));
+ return b.node();
+ }
+ }
+
+ private static boolean isFlat(RexCall call) {
+ boolean flat = false;
+ if (call.operands != null && call.operands.size() > 2) {
+ SqlOperator op = call.getOperator();
+ if (op.getKind() == SqlKind.AND || op.getKind() == SqlKind.OR) {
+ flat = true;
+ }
+ }
+
+ return flat;
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/DerivedTableInjector.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/DerivedTableInjector.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/DerivedTableInjector.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/DerivedTableInjector.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,214 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveAggregateRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveJoinRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveProjectRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveSortRel;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.EmptyRel;
+import org.eigenbase.rel.FilterRelBase;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.OneRowRelBase;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SetOpRel;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.rel.TableFunctionRelBase;
+import org.eigenbase.rel.ValuesRelBase;
+import org.eigenbase.rel.rules.MultiJoinRel;
+import org.eigenbase.relopt.hep.HepRelVertex;
+import org.eigenbase.relopt.volcano.RelSubset;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexNode;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+public class DerivedTableInjector {
+
+ public static RelNode convertOpTree(RelNode rel, List<FieldSchema> resultSchema) {
+ // Disable introducing top level select since Hive seems to have bugs with
+ // OB, Limit in sub query.
+ // RelNode newTopSelect = introduceTopLevelSelectInResultSchema(rel,
+ // resultSchema);
+ RelNode newTopSelect = rel;
+ convertOpTree(newTopSelect, (RelNode) null);
+ return newTopSelect;
+ }
+
+ private static void convertOpTree(RelNode rel, RelNode parent) {
+
+ if (rel instanceof EmptyRel) {
+ // TODO: replace with null scan
+ } else if (rel instanceof HepRelVertex) {
+ // TODO: is this relevant?
+ } else if (rel instanceof HiveJoinRel) {
+ if (!validJoinParent(rel, parent)) {
+ introduceDerivedTable(rel, parent);
+ }
+ } else if (rel instanceof MultiJoinRel) {
+
+ } else if (rel instanceof OneRowRelBase) {
+
+ } else if (rel instanceof RelSubset) {
+
+ } else if (rel instanceof SetOpRel) {
+
+ } else if (rel instanceof SingleRel) {
+ if (rel instanceof FilterRelBase) {
+ if (!validFilterParent(rel, parent)) {
+ introduceDerivedTable(rel, parent);
+ }
+ } else if (rel instanceof HiveSortRel) {
+ if (!validSortParent(rel, parent)) {
+ introduceDerivedTable(rel, parent);
+ }
+ if (!validSortChild((HiveSortRel) rel)) {
+ introduceDerivedTable(((HiveSortRel) rel).getChild(), rel);
+ }
+ } else if (rel instanceof HiveAggregateRel) {
+ if (!validGBParent(rel, parent)) {
+ introduceDerivedTable(rel, parent);
+ }
+ }
+ } else if (rel instanceof TableAccessRelBase) {
+
+ } else if (rel instanceof TableFunctionRelBase) {
+
+ } else if (rel instanceof ValuesRelBase) {
+
+ }
+
+ List<RelNode> childNodes = rel.getInputs();
+ if (childNodes != null) {
+ for (RelNode r : childNodes) {
+ convertOpTree(r, rel);
+ }
+ }
+ }
+
+ private static HiveProjectRel introduceTopLevelSelectInResultSchema(final RelNode rootRel,
+ List<FieldSchema> resultSchema) {
+ RelNode curNode = rootRel;
+ HiveProjectRel rootProjRel = null;
+ while (curNode != null) {
+ if (curNode instanceof HiveProjectRel) {
+ rootProjRel = (HiveProjectRel) curNode;
+ break;
+ }
+ curNode = curNode.getInput(0);
+ }
+
+ //Assumption: tree could only be (limit)?(OB)?(ProjectRelBase)....
+ List<RexNode> rootChildExps = rootProjRel.getChildExps();
+ if (resultSchema.size() != rootChildExps.size()) {
+ throw new RuntimeException("Result Schema didn't match Optiq Optimized Op Tree Schema");
+ }
+
+ List<RexNode> newSelExps = new ArrayList<RexNode>();
+ List<String> newSelAliases = new ArrayList<String>();
+ for (int i = 0; i < rootChildExps.size(); i++) {
+ newSelExps.add(new RexInputRef(i, rootChildExps.get(i).getType()));
+ newSelAliases.add(resultSchema.get(i).getName());
+ }
+
+ return HiveProjectRel.create(rootRel, newSelExps, newSelAliases);
+ }
+
+ private static void introduceDerivedTable(final RelNode rel, RelNode parent) {
+ int i = 0;
+ int pos = -1;
+ List<RelNode> childList = parent.getInputs();
+
+ for (RelNode child : childList) {
+ if (child == rel) {
+ pos = i;
+ break;
+ }
+ i++;
+ }
+
+ if (pos == -1) {
+ throw new RuntimeException("Couldn't find child node in parent's inputs");
+ }
+
+ List<RexNode> projectList = Lists.transform(rel.getRowType().getFieldList(),
+ new Function<RelDataTypeField, RexNode>() {
+ public RexNode apply(RelDataTypeField field) {
+ return rel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex());
+ }
+ });
+
+ HiveProjectRel select = HiveProjectRel.create(rel.getCluster(), rel, projectList,
+ rel.getRowType(), rel.getCollationList());
+ parent.replaceInput(pos, select);
+
+ }
+
+ private static boolean validJoinParent(RelNode joinNode, RelNode parent) {
+ boolean validParent = true;
+
+ if (parent instanceof JoinRelBase) {
+ if (((JoinRelBase) parent).getRight() == joinNode) {
+ validParent = false;
+ }
+ } else if (parent instanceof SetOpRel) {
+ validParent = false;
+ }
+
+ return validParent;
+ }
+
+ private static boolean validFilterParent(RelNode filterNode, RelNode parent) {
+ boolean validParent = true;
+
+ // TOODO: Verify GB having is not a seperate filter (if so we shouldn't
+ // introduce derived table)
+ if (parent instanceof FilterRelBase || parent instanceof JoinRelBase
+ || parent instanceof SetOpRel) {
+ validParent = false;
+ }
+
+ return validParent;
+ }
+
+ private static boolean validGBParent(RelNode gbNode, RelNode parent) {
+ boolean validParent = true;
+
+ // TOODO: Verify GB having is not a seperate filter (if so we shouldn't
+ // introduce derived table)
+ if (parent instanceof JoinRelBase || parent instanceof SetOpRel
+ || parent instanceof AggregateRelBase) {
+ validParent = false;
+ }
+
+ return validParent;
+ }
+
+ private static boolean validSortParent(RelNode sortNode, RelNode parent) {
+ boolean validParent = true;
+
+ if (parent != null && !(parent instanceof ProjectRelBase)) {
+ validParent = false;
+ }
+
+ return validParent;
+ }
+
+ private static boolean validSortChild(HiveSortRel sortNode) {
+ boolean validChild = true;
+ RelNode child = sortNode.getChild();
+
+ if (!(child instanceof ProjectRelBase)) {
+ validChild = false;
+ }
+
+ return validChild;
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RelNodeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RelNodeConverter.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RelNodeConverter.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RelNodeConverter.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,683 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.optiq.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveAggregateRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveFilterRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveJoinRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveProjectRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveSortRel;
+import org.apache.hadoop.hive.ql.optimizer.optiq.reloperators.HiveTableScanRel;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ColStatistics;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.Aggregation;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptSchema;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+import org.eigenbase.util.CompositeList;
+import org.eigenbase.util.Pair;
+
+import com.esotericsoftware.minlog.Log;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+public class RelNodeConverter {
+ private static final Map<String, Aggregation> AGG_MAP = ImmutableMap
+ .<String, Aggregation> builder()
+ .put("count", (Aggregation) SqlStdOperatorTable.COUNT)
+ .put("sum", SqlStdOperatorTable.SUM).put("min", SqlStdOperatorTable.MIN)
+ .put("max", SqlStdOperatorTable.MAX).put("avg", SqlStdOperatorTable.AVG)
+ .put("stddev_samp", SqlFunctionConverter.hiveAggFunction("stddev_samp"))
+ .build();
+
+ public static RelNode convert(Operator<? extends OperatorDesc> sinkOp, RelOptCluster cluster,
+ RelOptSchema schema, SemanticAnalyzer sA, ParseContext pCtx) {
+
+ Context ctx = new Context(cluster, schema, sA, pCtx);
+
+ Map<Rule, NodeProcessor> rules = ImmutableMap
+ .<Rule, NodeProcessor> builder()
+ .put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+ new TableScanProcessor())
+ .put(new RuleRegExp("R2", FilterOperator.getOperatorName() + "%"), new FilterProcessor())
+ .put(new RuleRegExp("R3", SelectOperator.getOperatorName() + "%"), new SelectProcessor())
+ .put(new RuleRegExp("R4", JoinOperator.getOperatorName() + "%"), new JoinProcessor())
+ .put(new RuleRegExp("R5", LimitOperator.getOperatorName() + "%"), new LimitProcessor())
+ .put(new RuleRegExp("R6", GroupByOperator.getOperatorName() + "%"), new GroupByProcessor())
+ .put(new RuleRegExp("R7", ReduceSinkOperator.getOperatorName() + "%"),
+ new ReduceSinkProcessor()).build();
+
+ Dispatcher disp = new DefaultRuleDispatcher(new DefaultProcessor(), rules, ctx);
+ GraphWalker egw = new ForwardWalker(disp);
+
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+
+ HashMap<Node, Object> outputMap = new HashMap<Node, Object>();
+ try {
+ egw.startWalking(topNodes, outputMap);
+ } catch (SemanticException se) {
+ // @revisit
+ throw new RuntimeException(se);
+ }
+ return (HiveRel) outputMap.get(sinkOp);
+ }
+
+ static class Context implements NodeProcessorCtx {
+ RelOptCluster cluster;
+ RelOptSchema schema;
+ SemanticAnalyzer sA;
+ ParseContext parseCtx;
+ /*
+ * A Map from hive column internalNames to Optiq positions. A separate map
+ * for each Operator.
+ */
+ Map<RelNode, ImmutableMap<String, Integer>> opPositionMap;
+
+ Map<Operator<? extends OperatorDesc>, RelNode> hiveOpToRelNode;
+
+ public Context(RelOptCluster cluster, RelOptSchema schema, SemanticAnalyzer sA,
+ ParseContext parseCtx) {
+ super();
+ this.cluster = cluster;
+ this.schema = schema;
+ this.sA = sA;
+ this.parseCtx = parseCtx;
+ opPositionMap = new HashMap<RelNode, ImmutableMap<String, Integer>>();
+ hiveOpToRelNode = new HashMap<Operator<? extends OperatorDesc>, RelNode>();
+ }
+
+ void buildColumnMap(Operator<? extends OperatorDesc> op, RelNode rNode) {
+ RowSchema rr = op.getSchema();
+ ImmutableMap.Builder<String, Integer> b = new ImmutableMap.Builder<String, Integer>();
+ int i = 0;
+ for (ColumnInfo ci : rr.getSignature()) {
+ b.put(ci.getInternalName(), i);
+ i++;
+ }
+ opPositionMap.put(rNode, b.build());
+ }
+
+ /*
+ * Why special handling for TableScan? - the RowResolver coming from hive
+ * for TScan still has all the columns, whereas the Optiq type we build is
+ * based on the needed columns in the TScanOp.
+ */
+ void buildColumnMap(TableScanOperator tsOp, RelNode rNode) {
+ RelDataType oType = rNode.getRowType();
+ int i = 0;
+ ImmutableMap.Builder<String, Integer> b = new ImmutableMap.Builder<String, Integer>();
+ for (String fN : oType.getFieldNames()) {
+ b.put(fN, i);
+ i++;
+ }
+ opPositionMap.put(rNode, b.build());
+ }
+
+ Map<String, Integer> reducerMap(Map<String, Integer> inpMap, ReduceSinkOperator rsOp) {
+ ImmutableMap.Builder<String, Integer> b = new ImmutableMap.Builder<String, Integer>();
+ Map<String, ExprNodeDesc> colExprMap = rsOp.getColumnExprMap();
+ for (Map.Entry<String, ExprNodeDesc> e : colExprMap.entrySet()) {
+ String inpCol = ((ExprNodeColumnDesc) e.getValue()).getColumn();
+ b.put(e.getKey(), inpMap.get(inpCol));
+ }
+ return b.build();
+ }
+
+ /*
+ * The Optiq JoinRel datatype is formed by combining the columns from its
+ * input RelNodes. Whereas the Hive RowResolver of the JoinOp contains only
+ * the columns needed by childOps.
+ */
+ void buildColumnMap(JoinOperator jOp, HiveJoinRel jRel) throws SemanticException {
+ RowResolver rr = sA.getRowResolver(jOp);
+ QBJoinTree hTree = parseCtx.getJoinContext().get(jOp);
+ Map<String, Integer> leftMap = opPositionMap.get(jRel.getLeft());
+ Map<String, Integer> rightMap = opPositionMap.get(jRel.getRight());
+ leftMap = reducerMap(leftMap, (ReduceSinkOperator) jOp.getParentOperators().get(0));
+ rightMap = reducerMap(rightMap, (ReduceSinkOperator) jOp.getParentOperators().get(1));
+ int leftColCount = jRel.getLeft().getRowType().getFieldCount();
+ ImmutableMap.Builder<String, Integer> b = new ImmutableMap.Builder<String, Integer>();
+ for (Map.Entry<String, LinkedHashMap<String, ColumnInfo>> tableEntry : rr.getRslvMap()
+ .entrySet()) {
+ String table = tableEntry.getKey();
+ LinkedHashMap<String, ColumnInfo> cols = tableEntry.getValue();
+ Map<String, Integer> posMap = leftMap;
+ int offset = 0;
+ if (hTree.getRightAliases() != null) {
+ for (String rAlias : hTree.getRightAliases()) {
+ if (table.equals(rAlias)) {
+ posMap = rightMap;
+ offset = leftColCount;
+ break;
+ }
+ }
+ }
+ for (Map.Entry<String, ColumnInfo> colEntry : cols.entrySet()) {
+ ColumnInfo ci = colEntry.getValue();
+ ExprNodeDesc e = jOp.getColumnExprMap().get(ci.getInternalName());
+ String cName = ((ExprNodeColumnDesc) e).getColumn();
+ int pos = posMap.get(cName);
+
+ b.put(ci.getInternalName(), pos + offset);
+ }
+ }
+ opPositionMap.put(jRel, b.build());
+ }
+
+ void propagatePosMap(RelNode node, RelNode parent) {
+ opPositionMap.put(node, opPositionMap.get(parent));
+ }
+
+ RexNode convertToOptiqExpr(final ExprNodeDesc expr, final RelNode optiqOP, final boolean flatten) {
+ return convertToOptiqExpr(expr, optiqOP, 0, flatten);
+ }
+
+ RexNode convertToOptiqExpr(final ExprNodeDesc expr, final RelNode optiqOP, int offset, final boolean flatten) {
+ ImmutableMap<String, Integer> posMap = opPositionMap.get(optiqOP);
+ RexNodeConverter c = new RexNodeConverter(cluster, optiqOP.getRowType(), posMap, offset, flatten);
+ return c.convert(expr);
+ }
+
+ RelNode getParentNode(Operator<? extends OperatorDesc> hiveOp, int i) {
+ Operator<? extends OperatorDesc> p = hiveOp.getParentOperators().get(i);
+ return p == null ? null : hiveOpToRelNode.get(p);
+ }
+
+ }
+
+ static class JoinProcessor implements NodeProcessor {
+ @SuppressWarnings("unchecked")
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ HiveRel left = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ HiveRel right = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 1);
+ JoinOperator joinOp = (JoinOperator) nd;
+ JoinCondDesc[] jConds = joinOp.getConf().getConds();
+ assert jConds.length == 1;
+ HiveJoinRel joinRel = convertJoinOp(ctx, joinOp, jConds[0], left, right);
+ ctx.buildColumnMap(joinOp, joinRel);
+ ctx.hiveOpToRelNode.put(joinOp, joinRel);
+ return joinRel;
+ }
+
+ /*
+ * @todo: cleanup, for now just copied from HiveToOptiqRelConvereter
+ */
+ private HiveJoinRel convertJoinOp(Context ctx, JoinOperator op, JoinCondDesc jc,
+ HiveRel leftRel, HiveRel rightRel) {
+ HiveJoinRel joinRel;
+ Operator<? extends OperatorDesc> leftParent = op.getParentOperators().get(jc.getLeft());
+ Operator<? extends OperatorDesc> rightParent = op.getParentOperators().get(jc.getRight());
+
+ if (leftParent instanceof ReduceSinkOperator && rightParent instanceof ReduceSinkOperator) {
+ List<ExprNodeDesc> leftCols = ((ReduceSinkDesc) (leftParent.getConf())).getKeyCols();
+ List<ExprNodeDesc> rightCols = ((ReduceSinkDesc) (rightParent.getConf())).getKeyCols();
+ RexNode joinPredicate = null;
+ JoinRelType joinType = JoinRelType.INNER;
+ int rightColOffSet = leftRel.getRowType().getFieldCount();
+
+ // TODO: what about semi join
+ switch (jc.getType()) {
+ case JoinDesc.INNER_JOIN:
+ joinType = JoinRelType.INNER;
+ break;
+ case JoinDesc.LEFT_OUTER_JOIN:
+ joinType = JoinRelType.LEFT;
+ break;
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ joinType = JoinRelType.RIGHT;
+ break;
+ case JoinDesc.FULL_OUTER_JOIN:
+ joinType = JoinRelType.FULL;
+ break;
+ }
+
+ int i = 0;
+ for (ExprNodeDesc expr : leftCols) {
+ List<RexNode> eqExpr = new LinkedList<RexNode>();
+ eqExpr.add(ctx.convertToOptiqExpr(expr, leftRel, 0, false));
+ eqExpr.add(ctx.convertToOptiqExpr(rightCols.get(i), rightRel, rightColOffSet, false));
+
+ RexNode eqOp = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.EQUALS, eqExpr);
+ i++;
+
+ if (joinPredicate == null) {
+ joinPredicate = eqOp;
+ } else {
+ List<RexNode> conjElements = new LinkedList<RexNode>();
+ conjElements.add(joinPredicate);
+ conjElements.add(eqOp);
+ joinPredicate = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.AND,
+ conjElements);
+ }
+ }
+
+ // Translate non-joinkey predicate
+ Set<Entry<Byte, List<ExprNodeDesc>>> filterExprSet = op.getConf().getFilters().entrySet();
+ if (!filterExprSet.isEmpty()) {
+ RexNode eqExpr;
+ int colOffSet;
+ RelNode childRel;
+ Operator parentHiveOp;
+ int inputId;
+
+ for (Entry<Byte, List<ExprNodeDesc>> entry : filterExprSet) {
+ inputId = entry.getKey().intValue();
+ if (inputId == 0) {
+ colOffSet = 0;
+ childRel = leftRel;
+ parentHiveOp = leftParent;
+ } else if (inputId == 1) {
+ colOffSet = rightColOffSet;
+ childRel = rightRel;
+ parentHiveOp = rightParent;
+ } else {
+ throw new RuntimeException("Invalid Join Input");
+ }
+
+ for (ExprNodeDesc expr : entry.getValue()) {
+ eqExpr = ctx.convertToOptiqExpr(expr, childRel, colOffSet, false);
+ List<RexNode> conjElements = new LinkedList<RexNode>();
+ conjElements.add(joinPredicate);
+ conjElements.add(eqExpr);
+ joinPredicate = ctx.cluster.getRexBuilder().makeCall(SqlStdOperatorTable.AND,
+ conjElements);
+ }
+ }
+ }
+
+ joinRel = HiveJoinRel.getJoin(ctx.cluster, leftRel, rightRel, joinPredicate, joinType);
+ } else {
+ throw new RuntimeException("Right & Left of Join Condition columns are not equal");
+ }
+
+ return joinRel;
+ }
+ }
+
+ private static int convertExpr(Context ctx, RelNode input, ExprNodeDesc expr,
+ List<RexNode> extraExprs) {
+ final RexNode rex = ctx.convertToOptiqExpr(expr, input, false);
+ final int index;
+ if (rex instanceof RexInputRef) {
+ index = ((RexInputRef) rex).getIndex();
+ } else {
+ index = input.getRowType().getFieldCount() + extraExprs.size();
+ extraExprs.add(rex);
+ }
+ return index;
+ }
+
+ private static AggregateCall convertAgg(Context ctx, AggregationDesc agg, RelNode input,
+ ColumnInfo cI, List<RexNode> extraExprs) {
+ final Aggregation aggregation = AGG_MAP.get(agg.getGenericUDAFName());
+ if (aggregation == null) {
+ throw new AssertionError("agg not found: " + agg.getGenericUDAFName());
+ }
+
+ List<Integer> argList = new ArrayList<Integer>();
+ RelDataType type = TypeConverter.convert(cI.getType(), ctx.cluster.getTypeFactory());
+ if (aggregation.equals(SqlStdOperatorTable.AVG)) {
+ type = type.getField("sum", false).getType();
+ }
+ for (ExprNodeDesc expr : agg.getParameters()) {
+ int index = convertExpr(ctx, input, expr, extraExprs);
+ argList.add(index);
+ }
+
+ /*
+ * set the type to the first arg, it there is one; because the RTi set on
+ * Aggregation call assumes this is the output type.
+ */
+ if (argList.size() > 0) {
+ RexNode rex = ctx.convertToOptiqExpr(agg.getParameters().get(0), input, false);
+ type = rex.getType();
+ }
+ return new AggregateCall(aggregation, agg.getDistinct(), argList, type, null);
+ }
+
+ static class FilterProcessor implements NodeProcessor {
+ @SuppressWarnings("unchecked")
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ HiveRel input = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ FilterOperator filterOp = (FilterOperator) nd;
+ RexNode convertedFilterExpr = ctx
+ .convertToOptiqExpr(filterOp.getConf().getPredicate(), input, true);
+
+ // Flatten the condition otherwise Optiq chokes on assertion
+ // (FilterRelBase)
+ if (convertedFilterExpr instanceof RexCall) {
+ RexCall call = (RexCall) convertedFilterExpr;
+ convertedFilterExpr = ctx.cluster.getRexBuilder().makeFlatCall(call.getOperator(),
+ call.getOperands());
+ }
+
+ HiveRel filtRel = new HiveFilterRel(ctx.cluster, ctx.cluster.traitSetOf(HiveRel.CONVENTION),
+ input, convertedFilterExpr);
+ ctx.propagatePosMap(filtRel, input);
+ ctx.hiveOpToRelNode.put(filterOp, filtRel);
+ return filtRel;
+ }
+ }
+
+ static class SelectProcessor implements NodeProcessor {
+ @SuppressWarnings("unchecked")
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ HiveRel inputRelNode = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ SelectOperator selectOp = (SelectOperator) nd;
+
+ List<ExprNodeDesc> colLst = selectOp.getConf().getColList();
+ List<RexNode> optiqColLst = new LinkedList<RexNode>();
+
+ for (ExprNodeDesc colExpr : colLst) {
+ optiqColLst.add(ctx.convertToOptiqExpr(colExpr, inputRelNode, false));
+ }
+
+ /*
+ * Hive treats names that start with '_c' as internalNames; so change the
+ * names so we don't run into this issue when converting back to Hive AST.
+ */
+ List<String> oFieldNames = Lists.transform(selectOp.getConf().getOutputColumnNames(),
+ new Function<String, String>() {
+ public String apply(String hName) {
+ return "_o_" + hName;
+ }
+ });
+
+ HiveRel selRel = HiveProjectRel.create(inputRelNode, optiqColLst, oFieldNames);
+ ctx.buildColumnMap(selectOp, selRel);
+ ctx.hiveOpToRelNode.put(selectOp, selRel);
+ return selRel;
+ }
+ }
+
+ static class LimitProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ HiveRel input = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ LimitOperator limitOp = (LimitOperator) nd;
+
+ // in Optiq, a limit is represented as a sort on 0 columns
+ final RexNode fetch;
+ if (limitOp.getConf().getLimit() >= 0) {
+ fetch = ctx.cluster.getRexBuilder().makeExactLiteral(
+ BigDecimal.valueOf(limitOp.getConf().getLimit()));
+ } else {
+ fetch = null;
+ }
+ RelTraitSet traitSet = ctx.cluster.traitSetOf(HiveRel.CONVENTION);
+ RelCollation canonizedCollation = traitSet.canonize(RelCollationImpl.EMPTY);
+ HiveRel sortRel = new HiveSortRel(ctx.cluster, traitSet, input, canonizedCollation, null,
+ fetch);
+ ctx.propagatePosMap(sortRel, input);
+ ctx.hiveOpToRelNode.put(limitOp, sortRel);
+ return sortRel;
+ }
+ }
+
+ static class GroupByProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+
+ HiveRel input = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ GroupByOperator groupByOp = (GroupByOperator) nd;
+ RowResolver rr = ctx.sA.getRowResolver(groupByOp);
+ ArrayList<ColumnInfo> signature = rr.getRowSchema().getSignature();
+
+ // GroupBy is represented by two operators, one map side and one reduce
+ // side. We only translate the map-side one.
+ if (groupByOp.getParentOperators().get(0) instanceof ReduceSinkOperator) {
+ ctx.hiveOpToRelNode.put(groupByOp, input);
+ return input;
+ }
+
+ final List<RexNode> extraExprs = Lists.newArrayList();
+ final BitSet groupSet = new BitSet();
+ for (ExprNodeDesc key : groupByOp.getConf().getKeys()) {
+ int index = convertExpr(ctx, input, key, extraExprs);
+ groupSet.set(index);
+ }
+ List<AggregateCall> aggregateCalls = Lists.newArrayList();
+ int i = groupByOp.getConf().getKeys().size();
+ for (AggregationDesc agg : groupByOp.getConf().getAggregators()) {
+ aggregateCalls.add(convertAgg(ctx, agg, input, signature.get(i++), extraExprs));
+ }
+
+ if (!extraExprs.isEmpty()) {
+ // noinspection unchecked
+ input = HiveProjectRel.create(input, CompositeList.of(Lists.transform(input.getRowType()
+ .getFieldList(), new Function<RelDataTypeField, RexNode>() {
+ public RexNode apply(RelDataTypeField input) {
+ return new RexInputRef(input.getIndex(), input.getType());
+ }
+ }), extraExprs), null);
+ }
+ try {
+ HiveRel aggregateRel = new HiveAggregateRel(ctx.cluster,
+ ctx.cluster.traitSetOf(HiveRel.CONVENTION), input, groupSet, aggregateCalls);
+ ctx.buildColumnMap(groupByOp, aggregateRel);
+ ctx.hiveOpToRelNode.put(groupByOp, aggregateRel);
+ return aggregateRel;
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e); // not possible
+ }
+ }
+ }
+
+ static class ReduceSinkProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ HiveRel input = (HiveRel) ctx.getParentNode((Operator<? extends OperatorDesc>) nd, 0);
+ ReduceSinkOperator sinkOp = (ReduceSinkOperator) nd;
+
+ // It is a sort reducer if and only if the number of reducers is 1.
+ final ReduceSinkDesc conf = sinkOp.getConf();
+ if (conf.getNumReducers() != 1) {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ ctx.hiveOpToRelNode.put(op, input);
+ return input;
+ }
+
+ final String order = conf.getOrder(); // "+-" means "ASC, DESC"
+ assert order.length() == conf.getKeyCols().size();
+
+ /*
+ * numReducers == 1 and order.length = 1 => a RS for CrossJoin.
+ */
+ if ( order.length() == 0 ) {
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ ctx.hiveOpToRelNode.put(op, input);
+ return input;
+ }
+
+ final List<RelFieldCollation> fieldCollations = Lists.newArrayList();
+ final List<RexNode> extraExprs = Lists.newArrayList();
+ for (Pair<ExprNodeDesc, Character> pair : Pair.zip(conf.getKeyCols(),
+ Lists.charactersOf(order))) {
+ int index = convertExpr(ctx, input, pair.left, extraExprs);
+ RelFieldCollation.Direction direction = getDirection(pair.right);
+ fieldCollations.add(new RelFieldCollation(index, direction));
+ }
+
+ if (!extraExprs.isEmpty()) {
+ // noinspection unchecked
+ input = HiveProjectRel.create(input, CompositeList.of(Lists.transform(input.getRowType()
+ .getFieldList(), new Function<RelDataTypeField, RexNode>() {
+ public RexNode apply(RelDataTypeField input) {
+ return new RexInputRef(input.getIndex(), input.getType());
+ }
+ }), extraExprs), null);
+ }
+
+ RelTraitSet traitSet = ctx.cluster.traitSetOf(HiveRel.CONVENTION);
+ RelCollation canonizedCollation = traitSet.canonize(RelCollationImpl.of(fieldCollations));
+ HiveRel sortRel = new HiveSortRel(ctx.cluster, traitSet, input, canonizedCollation, null,
+ null);
+ ctx.propagatePosMap(sortRel, input);
+ ctx.hiveOpToRelNode.put(sinkOp, sortRel);
+
+ // REVIEW: Do we need to remove the columns we added due to extraExprs?
+
+ return sortRel;
+ }
+
+ private RelFieldCollation.Direction getDirection(char c) {
+ switch (c) {
+ case '+':
+ return RelFieldCollation.Direction.ASCENDING;
+ case '-':
+ return RelFieldCollation.Direction.DESCENDING;
+ default:
+ throw new AssertionError("unexpected direction " + c);
+ }
+ }
+ }
+
+ static class TableScanProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Context ctx = (Context) procCtx;
+ TableScanOperator tableScanOp = (TableScanOperator) nd;
+ RowResolver rr = ctx.sA.getRowResolver(tableScanOp);
+
+ List<String> neededCols = new ArrayList<String>(
+ tableScanOp.getNeededColumns());
+ Statistics stats = tableScanOp.getStatistics();
+
+ try {
+ stats = addPartitionColumns(ctx, tableScanOp, tableScanOp.getConf()
+ .getAlias(), ctx.sA.getTable(tableScanOp), stats, neededCols);
+ } catch (CloneNotSupportedException ce) {
+ throw new SemanticException(ce);
+ }
+
+ if (stats.getColumnStats().size() != neededCols.size()) {
+ throw new SemanticException("Incomplete Col stats for table: "
+ + tableScanOp.getConf().getAlias());
+ }
+ RelDataType rowType = TypeConverter.getType(ctx.cluster, rr, neededCols);
+ RelOptHiveTable optTable = new RelOptHiveTable(ctx.schema, tableScanOp.getConf().getAlias(),
+ rowType, ctx.sA.getTable(tableScanOp), stats);
+ TableAccessRelBase tableRel = new HiveTableScanRel(ctx.cluster,
+ ctx.cluster.traitSetOf(HiveRel.CONVENTION), optTable, rowType);
+ ctx.buildColumnMap(tableScanOp, tableRel);
+ ctx.hiveOpToRelNode.put(tableScanOp, tableRel);
+ return tableRel;
+ }
+
+ /*
+ * Add partition columns to needed columns and fake the COlStats for it.
+ */
+ private Statistics addPartitionColumns(Context ctx,
+ TableScanOperator tableScanOp, String tblAlias, Table tbl,
+ Statistics stats, List<String> neededCols)
+ throws CloneNotSupportedException {
+ if (!tbl.isPartitioned()) {
+ return stats;
+ }
+ List<ColStatistics> pStats = new ArrayList<ColStatistics>();
+ List<FieldSchema> pCols = tbl.getPartCols();
+ for (FieldSchema pC : pCols) {
+ neededCols.add(pC.getName());
+ ColStatistics cStats = stats.getColumnStatisticsForColumn(tblAlias,
+ pC.getName());
+ if (cStats == null) {
+ PrunedPartitionList partList = ctx.parseCtx.getOpToPartList().get(
+ tableScanOp);
+ cStats = new ColStatistics(tblAlias, pC.getName(), pC.getType());
+ cStats.setCountDistint(partList.getPartitions().size());
+ pStats.add(cStats);
+ }
+ }
+ if (pStats.size() > 0) {
+ stats = stats.clone();
+ stats.addToColumnStats(pStats);
+ }
+
+ return stats;
+ }
+ }
+
+ static class DefaultProcessor implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ @SuppressWarnings("unchecked")
+ Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+ Context ctx = (Context) procCtx;
+ RelNode node = (HiveRel) ctx.getParentNode(op, 0);
+ ctx.hiveOpToRelNode.put(op, node);
+ return node;
+ }
+ }
+}
Added: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RexNodeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RexNodeConverter.java?rev=1605013&view=auto
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RexNodeConverter.java (added)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/RexNodeConverter.java Tue Jun 24 06:32:30 2014
@@ -0,0 +1,130 @@
+package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
+
+import java.math.BigDecimal;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.SqlOperator;
+
+import com.google.common.collect.ImmutableMap;
+
+public class RexNodeConverter {
+
+ private final RelOptCluster m_cluster;
+ private final RelDataType m_inpDataType;
+ private final ImmutableMap<String, Integer> m_nameToPosMap;
+ private final int m_offset;
+ private final boolean m_flattenExpr;
+
+ public RexNodeConverter(RelOptCluster cluster, RelDataType inpDataType,
+ ImmutableMap<String, Integer> nameToPosMap, int offset, boolean flattenExpr) {
+ this.m_cluster = cluster;
+ this.m_inpDataType = inpDataType;
+ this.m_nameToPosMap = nameToPosMap;
+ this.m_offset = offset;
+ m_flattenExpr = flattenExpr;
+ }
+
+ public RexNode convert(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ return convert((ExprNodeGenericFuncDesc) expr);
+ } else if (expr instanceof ExprNodeConstantDesc) {
+ return convert((ExprNodeConstantDesc) expr);
+ } else if (expr instanceof ExprNodeColumnDesc) {
+ return convert((ExprNodeColumnDesc) expr);
+ } else {
+ throw new RuntimeException("Unsupported Expression");
+ }
+ // TODO: handle a) ExprNodeNullDesc b) ExprNodeFieldDesc c)
+ // ExprNodeColumnListDesc
+ }
+
+ private RexNode convert(final ExprNodeGenericFuncDesc func) {
+ SqlOperator optiqOp = SqlFunctionConverter.getOptiqOperator(func.getGenericUDF());
+ List<RexNode> childRexNodeLst = new LinkedList<RexNode>();
+
+ for (ExprNodeDesc childExpr : func.getChildren()) {
+ childRexNodeLst.add(convert(childExpr));
+ }
+
+ RexNode convertedFilterExpr = m_cluster.getRexBuilder().makeCall(optiqOp, childRexNodeLst);
+ if (m_flattenExpr && convertedFilterExpr instanceof RexCall) {
+ RexCall call = (RexCall) convertedFilterExpr;
+ convertedFilterExpr = m_cluster.getRexBuilder().makeFlatCall(call.getOperator(),
+ call.getOperands());
+ }
+
+ return convertedFilterExpr;
+ }
+
+ protected RexNode convert(ExprNodeColumnDesc col) {
+ int pos = m_nameToPosMap.get(col.getColumn());
+ return m_cluster.getRexBuilder().makeInputRef(m_inpDataType.getFieldList().get(pos).getType(),
+ pos + m_offset);
+ }
+
+ protected RexNode convert(ExprNodeConstantDesc literal) {
+ RexBuilder rexBuilder = m_cluster.getRexBuilder();
+ RelDataTypeFactory dtFactory = rexBuilder.getTypeFactory();
+ PrimitiveTypeInfo hiveType = (PrimitiveTypeInfo) literal.getTypeInfo();
+ RelDataType optiqDataType = TypeConverter.convert(hiveType, dtFactory);
+
+ PrimitiveCategory hiveTypeCategory = hiveType.getPrimitiveCategory();
+ RexNode optiqLiteral = null;
+ Object value = literal.getValue();
+
+ // TODO: Verify if we need to use ConstantObjectInspector to unwrap data
+ switch (hiveTypeCategory) {
+ case BOOLEAN:
+ optiqLiteral = rexBuilder.makeLiteral(((Boolean) value).booleanValue());
+ break;
+ case BYTE:
+ optiqLiteral = rexBuilder.makeExactLiteral(new BigDecimal((Short) value));
+ break;
+ case SHORT:
+ optiqLiteral = rexBuilder.makeExactLiteral(new BigDecimal((Short) value));
+ break;
+ case INT:
+ optiqLiteral = rexBuilder.makeExactLiteral(new BigDecimal((Integer) value));
+ break;
+ case LONG:
+ optiqLiteral = rexBuilder.makeBigintLiteral(new BigDecimal((Long) value));
+ break;
+ // TODO: is Decimal an exact numeric or approximate numeric?
+ case DECIMAL:
+ optiqLiteral = rexBuilder.makeExactLiteral((BigDecimal) value);
+ break;
+ case FLOAT:
+ optiqLiteral = rexBuilder.makeApproxLiteral(new BigDecimal((Float) value), optiqDataType);
+ break;
+ case DOUBLE:
+ optiqLiteral = rexBuilder.makeApproxLiteral(new BigDecimal((Double) value), optiqDataType);
+ break;
+ case STRING:
+ optiqLiteral = rexBuilder.makeLiteral((String) value);
+ break;
+ case DATE:
+ case TIMESTAMP:
+ case BINARY:
+ case VOID:
+ case UNKNOWN:
+ default:
+ throw new RuntimeException("UnSupported Literal");
+ }
+
+ return optiqLiteral;
+ }
+
+}