You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/17 09:29:46 UTC
svn commit: r1468783 [6/16] - in /hive/branches/HIVE-4115: ./
beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/
beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/
beeline/src/test/org/apache/hive/beeline/ beeline/src/test/org/...
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Wed Apr 17 07:29:38 2013
@@ -18,15 +18,17 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.io.Serializable;
+import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -52,32 +54,54 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOIN;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONMINREDUCER;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST;
/**
- * If two reducer sink operators share the same partition/sort columns, we
- * should merge them. This should happen after map join optimization because map
+ * If two reducer sink operators share the same partition/sort columns and order,
+ * they can be merged. This should happen after map join optimization because map
* join optimization will remove reduce sink operators.
*/
public class ReduceSinkDeDuplication implements Transform{
+ private static final String RS = ReduceSinkOperator.getOperatorName();
+ private static final String GBY = GroupByOperator.getOperatorName();
+ private static final String JOIN = JoinOperator.getOperatorName();
+
protected ParseContext pGraphContext;
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
pGraphContext = pctx;
- // generate pruned column list for all relevant operators
+ // generate pruned column list for all relevant operators
ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
+ boolean mergeJoins = !pctx.getConf().getBoolVar(HIVECONVERTJOIN) &&
+ !pctx.getConf().getBoolVar(HIVECONVERTJOINNOCONDITIONALTASK);
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1",
- ReduceSinkOperator.getOperatorName() + "%.*" + ReduceSinkOperator.getOperatorName() + "%"),
- ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+ opRules.put(new RuleRegExp("R1", RS + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getReducerReducerProc());
+ opRules.put(new RuleRegExp("R2", RS + "%" + GBY + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getGroupbyReducerProc());
+ if (mergeJoins) {
+ opRules.put(new RuleRegExp("R3", JOIN + "%.*%" + RS + "%"),
+ ReduceSinkDeduplicateProcFactory.getJoinReducerProc());
+ }
+ // TODO RS+JOIN
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -92,23 +116,27 @@ public class ReduceSinkDeDuplication imp
return pGraphContext;
}
- class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
+ class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx {
+
ParseContext pctx;
- List<ReduceSinkOperator> rejectedRSList;
+ boolean trustScript;
+ // min reducer num for merged RS (to avoid query contains "order by" executed by one reducer)
+ int minReducer;
+ Set<Operator<?>> removedOps;
public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
- rejectedRSList = new ArrayList<ReduceSinkOperator>();
+ removedOps = new HashSet<Operator<?>>();
+ trustScript = pctx.getConf().getBoolVar(HIVESCRIPTOPERATORTRUST);
+ minReducer = pctx.getConf().getIntVar(HIVEOPTREDUCEDEDUPLICATIONMINREDUCER);
this.pctx = pctx;
}
- public boolean contains (ReduceSinkOperator rsOp) {
- return rejectedRSList.contains(rsOp);
+ public boolean contains(Operator<?> rsOp) {
+ return removedOps.contains(rsOp);
}
- public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
- if (!rejectedRSList.contains(rsOp)) {
- rejectedRSList.add(rsOp);
- }
+ public boolean addRemovedOperator(Operator<?> rsOp) {
+ return removedOps.add(rsOp);
}
public ParseContext getPctx() {
@@ -120,355 +148,598 @@ public class ReduceSinkDeDuplication imp
}
}
-
static class ReduceSinkDeduplicateProcFactory {
-
public static NodeProcessor getReducerReducerProc() {
return new ReducerReducerProc();
}
+ public static NodeProcessor getGroupbyReducerProc() {
+ return new GroupbyReducerProc();
+ }
+
+ public static NodeProcessor getJoinReducerProc() {
+ return new JoinReducerProc();
+ }
+
public static NodeProcessor getDefaultProc() {
return new DefaultProc();
}
+ }
- /*
- * do nothing.
- */
- static class DefaultProc implements NodeProcessor {
- @Override
- public Object process(Node nd, Stack<Node> stack,
- NodeProcessorCtx procCtx, Object... nodeOutputs)
- throws SemanticException {
- return null;
- }
+ /*
+ * do nothing.
+ */
+ static class DefaultProc implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ return null;
}
+ }
- static class ReducerReducerProc implements NodeProcessor {
- @Override
- public Object process(Node nd, Stack<Node> stack,
- NodeProcessorCtx procCtx, Object... nodeOutputs)
- throws SemanticException {
- ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
- ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
+ public abstract static class AbsctractReducerReducerProc implements NodeProcessor {
- if(ctx.contains(childReduceSink)) {
- return null;
- }
+ ReduceSinkDeduplicateProcCtx dedupCtx;
- List<Operator<? extends OperatorDesc>> childOp =
- childReduceSink.getChildOperators();
- if (childOp != null && childOp.size() == 1) {
- Operator<? extends OperatorDesc> child = childOp.get(0);
- if (child instanceof GroupByOperator || child instanceof JoinOperator) {
- ctx.addRejectedReduceSinkOperator(childReduceSink);
- return null;
- }
- }
+ protected boolean trustScript() {
+ return dedupCtx.trustScript;
+ }
- ParseContext pGraphContext = ctx.getPctx();
- HashMap<String, String> childColumnMapping =
- getPartitionAndKeyColumnMapping(childReduceSink);
- ReduceSinkOperator parentRS = null;
- parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext);
- if (parentRS == null) {
- ctx.addRejectedReduceSinkOperator(childReduceSink);
- return null;
- }
- HashMap<String, String> parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS);
- Operator<? extends OperatorDesc> stopBacktrackFlagOp = null;
- if (parentRS.getParentOperators() == null
- || parentRS.getParentOperators().size() == 0) {
- stopBacktrackFlagOp = parentRS;
- } else if (parentRS.getParentOperators().size() != 1) {
- return null;
- } else {
- stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
- }
+ protected int minReducer() {
+ return dedupCtx.minReducer;
+ }
- boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
- if (!succeed) {
- return null;
- }
- succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext);
- if (!succeed) {
- return null;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ dedupCtx = (ReduceSinkDeduplicateProcCtx) procCtx;
+ if (dedupCtx.contains((Operator<?>) nd)) {
+ return false;
+ }
+ ReduceSinkOperator cRS = (ReduceSinkOperator) nd;
+ Operator<?> child = getSingleChild(cRS);
+ if (child instanceof JoinOperator) {
+ return false; // not supported
+ }
+ ParseContext pctx = dedupCtx.getPctx();
+ if (child instanceof GroupByOperator) {
+ GroupByOperator cGBY = (GroupByOperator) child;
+ if (!hasGroupingSet(cRS) && !cGBY.getConf().isGroupingSetsPresent()) {
+ return process(cRS, cGBY, pctx);
}
+ return false;
+ }
+ if (child instanceof ExtractOperator) {
+ return process(cRS, pctx);
+ }
+ return false;
+ }
- boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
- if (!same) {
- return null;
- }
- replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext);
- return null;
+ private boolean hasGroupingSet(ReduceSinkOperator cRS) {
+ GroupByOperator cGBYm = getSingleParent(cRS, GroupByOperator.class);
+ if (cGBYm != null && cGBYm.getConf().isGroupingSetsPresent()) {
+ return true;
+ }
+ return false;
+ }
+
+ protected Operator<?> getSingleParent(Operator<?> operator) {
+ List<Operator<?>> parents = operator.getParentOperators();
+ if (parents != null && parents.size() == 1) {
+ return parents.get(0);
+ }
+ return null;
+ }
+
+ protected Operator<?> getSingleChild(Operator<?> operator) {
+ List<Operator<?>> children = operator.getChildOperators();
+ if (children != null && children.size() == 1) {
+ return children.get(0);
}
+ return null;
+ }
- private void replaceReduceSinkWithSelectOperator(
- ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
- List<Operator<? extends OperatorDesc>> parentOp =
- childReduceSink.getParentOperators();
- List<Operator<? extends OperatorDesc>> childOp =
- childReduceSink.getChildOperators();
+ protected <T> T getSingleParent(Operator<?> operator, Class<T> type) {
+ Operator<?> parent = getSingleParent(operator);
+ return type.isInstance(parent) ? (T)parent : null;
+ }
- Operator<? extends OperatorDesc> oldParent = childReduceSink;
+ protected abstract Object process(ReduceSinkOperator cRS, ParseContext context)
+ throws SemanticException;
- if (childOp != null && childOp.size() == 1
- && ((childOp.get(0)) instanceof ExtractOperator)) {
- oldParent = childOp.get(0);
- childOp = childOp.get(0).getChildOperators();
- }
+ protected abstract Object process(ReduceSinkOperator cRS, GroupByOperator cGBY,
+ ParseContext context) throws SemanticException;
- Operator<? extends OperatorDesc> input = parentOp.get(0);
- input.getChildOperators().clear();
+ protected Operator<?> getStartForGroupBy(ReduceSinkOperator cRS) {
+ Operator<? extends Serializable> parent = getSingleParent(cRS);
+ return parent instanceof GroupByOperator ? parent : cRS; // skip map-aggr GBY
+ }
- RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
+ // for JOIN-RS case, it's not possible generally to merge if child has
+ // more key/partition columns than parents
+ protected boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minReducer)
+ throws SemanticException {
+ List<Operator<?>> parents = pJoin.getParentOperators();
+ ReduceSinkOperator[] pRSs = parents.toArray(new ReduceSinkOperator[parents.size()]);
+ ReduceSinkDesc cRSc = cRS.getConf();
+ ReduceSinkDesc pRS0c = pRSs[0].getConf();
+ if (cRSc.getKeyCols().size() > pRS0c.getKeyCols().size()) {
+ return false;
+ }
+ if (cRSc.getPartitionCols().size() > pRS0c.getPartitionCols().size()) {
+ return false;
+ }
+ Integer moveReducerNumTo = checkNumReducer(cRSc.getNumReducers(), pRS0c.getNumReducers());
+ if (moveReducerNumTo == null ||
+ moveReducerNumTo > 0 && cRSc.getNumReducers() < minReducer) {
+ return false;
+ }
- ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
- ArrayList<String> outputs = new ArrayList<String>();
- List<String> outputCols = childReduceSink.getConf().getOutputValueColumnNames();
- RowResolver outputRS = new RowResolver();
+ Integer moveRSOrderTo = checkOrder(cRSc.getOrder(), pRS0c.getOrder());
+ if (moveRSOrderTo == null) {
+ return false;
+ }
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ boolean[] sorted = getSortedTags(pJoin);
- for (int i = 0; i < outputCols.size(); i++) {
- String internalName = outputCols.get(i);
- String[] nm = inputRR.reverseLookup(internalName);
- ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
- ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i);
- exprs.add(colDesc);
- outputs.add(internalName);
- outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
- .getType(), nm[0], valueInfo.getIsVirtualCol(), valueInfo.isHiddenVirtualCol()));
- colExprMap.put(internalName, colDesc);
+ int cKeySize = cRSc.getKeyCols().size();
+ for (int i = 0; i < cKeySize; i++) {
+ ExprNodeDesc cexpr = cRSc.getKeyCols().get(i);
+ ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+ for (int tag = 0; tag < pRSs.length; tag++) {
+ pexprs[tag] = pRSs[tag].getConf().getKeyCols().get(i);
}
+ int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+ if (found < 0) {
+ return false;
+ }
+ }
+ int cPartSize = cRSc.getPartitionCols().size();
+ for (int i = 0; i < cPartSize; i++) {
+ ExprNodeDesc cexpr = cRSc.getPartitionCols().get(i);
+ ExprNodeDesc[] pexprs = new ExprNodeDesc[pRSs.length];
+ for (int tag = 0; tag < pRSs.length; tag++) {
+ pexprs[tag] = pRSs[tag].getConf().getPartitionCols().get(i);
+ }
+ int found = indexOf(cexpr, pexprs, cRS, pRSs, sorted);
+ if (found < 0) {
+ return false;
+ }
+ }
- SelectDesc select = new SelectDesc(exprs, outputs, false);
+ if (moveReducerNumTo > 0) {
+ for (ReduceSinkOperator pRS : pRSs) {
+ pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
+ }
+ }
+ return true;
+ }
- SelectOperator sel = (SelectOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
- .getColumnInfos()), input), inputRR, pGraphContext);
+ private boolean[] getSortedTags(JoinOperator joinOp) {
+ boolean[] result = new boolean[joinOp.getParentOperators().size()];
+ for (int tag = 0; tag < result.length; tag++) {
+ result[tag] = isSortedTag(joinOp, tag);
+ }
+ return result;
+ }
- sel.setColumnExprMap(colExprMap);
+ private boolean isSortedTag(JoinOperator joinOp, int tag) {
+ for (JoinCondDesc cond : joinOp.getConf().getConds()) {
+ switch (cond.getType()) {
+ case JoinDesc.LEFT_OUTER_JOIN:
+ if (cond.getRight() == tag) {
+ return false;
+ }
+ continue;
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ if (cond.getLeft() == tag) {
+ return false;
+ }
+ continue;
+ case JoinDesc.FULL_OUTER_JOIN:
+ if (cond.getLeft() == tag || cond.getRight() == tag) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
- // Insert the select operator in between.
- sel.setChildOperators(childOp);
- for (Operator<? extends OperatorDesc> ch : childOp) {
- ch.replaceParent(oldParent, sel);
+ private int indexOf(ExprNodeDesc cexpr, ExprNodeDesc[] pexprs, Operator child,
+ Operator[] parents, boolean[] sorted) throws SemanticException {
+ for (int tag = 0; tag < parents.length; tag++) {
+ if (sorted[tag] &&
+ pexprs[tag].isSame(ExprNodeDescUtils.backtrack(cexpr, child, parents[tag]))) {
+ return tag;
}
+ }
+ return -1;
+ }
+ protected boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+ throws SemanticException {
+ int[] result = checkStatus(cRS, pRS, minReducer);
+ if (result == null) {
+ return false;
+ }
+ if (result[0] > 0) {
+ ArrayList<ExprNodeDesc> childKCs = cRS.getConf().getKeyCols();
+ pRS.getConf().setKeyCols(ExprNodeDescUtils.backtrack(childKCs, cRS, pRS));
+ }
+ if (result[1] > 0) {
+ ArrayList<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+ pRS.getConf().setPartitionCols(ExprNodeDescUtils.backtrack(childPCs, cRS, pRS));
+ }
+ if (result[2] > 0) {
+ pRS.getConf().setOrder(cRS.getConf().getOrder());
+ }
+ if (result[3] > 0) {
+ pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
}
+ return true;
+ }
- private Operator<? extends OperatorDesc> putOpInsertMap(
- Operator<? extends OperatorDesc> op, RowResolver rr, ParseContext pGraphContext) {
- OpParseContext ctx = new OpParseContext(rr);
- pGraphContext.getOpParseCtx().put(op, ctx);
- return op;
+ // -1 for p to c, 1 for c to p
+ private int[] checkStatus(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+ throws SemanticException {
+ ReduceSinkDesc cConf = cRS.getConf();
+ ReduceSinkDesc pConf = pRS.getConf();
+ Integer moveRSOrderTo = checkOrder(cConf.getOrder(), pConf.getOrder());
+ if (moveRSOrderTo == null) {
+ return null;
+ }
+ Integer moveReducerNumTo = checkNumReducer(cConf.getNumReducers(), pConf.getNumReducers());
+ if (moveReducerNumTo == null ||
+ moveReducerNumTo > 0 && cConf.getNumReducers() < minReducer) {
+ return null;
+ }
+ List<ExprNodeDesc> ckeys = cConf.getKeyCols();
+ List<ExprNodeDesc> pkeys = pConf.getKeyCols();
+ Integer moveKeyColTo = checkExprs(ckeys, pkeys, cRS, pRS);
+ if (moveKeyColTo == null) {
+ return null;
}
+ List<ExprNodeDesc> cpars = cConf.getPartitionCols();
+ List<ExprNodeDesc> ppars = pConf.getPartitionCols();
+ Integer movePartitionColTo = checkExprs(cpars, ppars, cRS, pRS);
+ if (movePartitionColTo == null) {
+ return null;
+ }
+ return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+ }
- private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
- ReduceSinkOperator parentRS,
- HashMap<String, String> childColumnMapping,
- HashMap<String, String> parentColumnMapping) {
+ private Integer checkExprs(List<ExprNodeDesc> ckeys, List<ExprNodeDesc> pkeys,
+ ReduceSinkOperator cRS, ReduceSinkOperator pRS) throws SemanticException {
+ Integer moveKeyColTo = 0;
+ if (ckeys == null || ckeys.isEmpty()) {
+ if (pkeys != null && !pkeys.isEmpty()) {
+ moveKeyColTo = -1;
+ }
+ } else {
+ if (pkeys == null || pkeys.isEmpty()) {
+ for (ExprNodeDesc ckey : ckeys) {
+ if (ExprNodeDescUtils.backtrack(ckey, cRS, pRS) == null) {
+ return null;
+ }
+ }
+ moveKeyColTo = 1;
+ } else {
+ moveKeyColTo = sameKeys(ckeys, pkeys, cRS, pRS);
+ }
+ }
+ return moveKeyColTo;
+ }
- ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
- ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
+ protected Integer sameKeys(List<ExprNodeDesc> cexprs, List<ExprNodeDesc> pexprs,
+ Operator<?> child, Operator<?> parent) throws SemanticException {
+ int common = Math.min(cexprs.size(), pexprs.size());
+ int limit = Math.max(cexprs.size(), pexprs.size());
+ int i = 0;
+ for (; i < common; i++) {
+ ExprNodeDesc pexpr = pexprs.get(i);
+ ExprNodeDesc cexpr = ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent);
+ if (!pexpr.isSame(cexpr)) {
+ return null;
+ }
+ }
+ for (;i < limit; i++) {
+ if (cexprs.size() > pexprs.size()) {
+ if (ExprNodeDescUtils.backtrack(cexprs.get(i), child, parent) == null) {
+ return null;
+ }
+ }
+ }
+ return Integer.valueOf(cexprs.size()).compareTo(pexprs.size());
+ }
- boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
- childPartitionCols, parentPartitionCols);
- if (!ret) {
- return false;
+ protected Integer checkOrder(String corder, String porder) {
+ if (corder == null || corder.trim().equals("")) {
+ if (porder == null || porder.trim().equals("")) {
+ return 0;
}
+ return -1;
+ }
+ if (porder == null || porder.trim().equals("")) {
+ return 1;
+ }
+ corder = corder.trim();
+ porder = porder.trim();
+ int target = Math.min(corder.length(), porder.length());
+ if (!corder.substring(0, target).equals(porder.substring(0, target))) {
+ return null;
+ }
+ return Integer.valueOf(corder.length()).compareTo(porder.length());
+ }
- ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
- ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
- ret = compareExprNodes(childColumnMapping, parentColumnMapping,
- childReduceKeyCols, parentReduceKeyCols);
- if (!ret) {
- return false;
+ protected Integer checkNumReducer(int creduce, int preduce) {
+ if (creduce < 0) {
+ if (preduce < 0) {
+ return 0;
}
+ return -1;
+ }
+ if (preduce < 0) {
+ return 1;
+ }
+ if (creduce != preduce) {
+ return null;
+ }
+ return 0;
+ }
- String childRSOrder = childReduceSink.getConf().getOrder();
- String parentRSOrder = parentRS.getConf().getOrder();
- boolean moveChildRSOrderToParent = false;
- //move child reduce sink's order to the parent reduce sink operator.
- if (childRSOrder != null && !(childRSOrder.trim().equals(""))) {
- if (parentRSOrder == null
- || !childRSOrder.trim().equals(parentRSOrder.trim())) {
- return false;
- }
- } else {
- if(parentRSOrder == null || parentRSOrder.trim().equals("")) {
- moveChildRSOrderToParent = true;
- }
+ protected <T extends Operator<?>> T findPossibleParent(Operator<?> start, Class<T> target,
+ boolean trustScript) throws SemanticException {
+ T[] parents = findPossibleParents(start, target, trustScript);
+ return parents != null && parents.length == 1 ? parents[0] : null;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T extends Operator<?>> T[] findPossibleParents(Operator<?> start, Class<T> target,
+ boolean trustScript) {
+ Operator<?> cursor = getSingleParent(start);
+ for (; cursor != null; cursor = getSingleParent(cursor)) {
+ if (target.isAssignableFrom(cursor.getClass())) {
+ T[] array = (T[]) Array.newInstance(target, 1);
+ array[0] = (T) cursor;
+ return array;
+ }
+ if (cursor instanceof JoinOperator) {
+ return findParents((JoinOperator) cursor, target);
}
+ if (cursor instanceof ScriptOperator && !trustScript) {
+ return null;
+ }
+ if (!(cursor instanceof SelectOperator
+ || cursor instanceof FilterOperator
+ || cursor instanceof ExtractOperator
+ || cursor instanceof ForwardOperator
+ || cursor instanceof ScriptOperator
+ || cursor instanceof ReduceSinkOperator)) {
+ return null;
+ }
+ }
+ return null;
+ }
- int childNumReducers = childReduceSink.getConf().getNumReducers();
- int parentNumReducers = parentRS.getConf().getNumReducers();
- boolean moveChildReducerNumToParent = false;
- //move child reduce sink's number reducers to the parent reduce sink operator.
- if (childNumReducers != parentNumReducers) {
- if (childNumReducers == -1) {
- //do nothing.
- } else if (parentNumReducers == -1) {
- //set childNumReducers in the parent reduce sink operator.
- moveChildReducerNumToParent = true;
- } else {
- return false;
+ @SuppressWarnings("unchecked")
+ private <T extends Operator<?>> T[] findParents(JoinOperator join, Class<T> target) {
+ List<Operator<?>> parents = join.getParentOperators();
+ T[] result = (T[]) Array.newInstance(target, parents.size());
+ for (int tag = 0; tag < result.length; tag++) {
+ Operator<?> cursor = parents.get(tag);
+ for (; cursor != null; cursor = getSingleParent(cursor)) {
+ if (target.isAssignableFrom(cursor.getClass())) {
+ result[tag] = (T) cursor;
+ break;
}
}
-
- if(moveChildRSOrderToParent) {
- parentRS.getConf().setOrder(childRSOrder);
+ if (result[tag] == null) {
+ throw new IllegalStateException("failed to find " + target.getSimpleName()
+ + " from " + join + " on tag " + tag);
}
+ }
+ return result;
+ }
- if(moveChildReducerNumToParent) {
- parentRS.getConf().setNumReducers(childNumReducers);
- }
+ protected SelectOperator replaceReduceSinkWithSelectOperator(ReduceSinkOperator childRS,
+ ParseContext context) throws SemanticException {
+ SelectOperator select = replaceOperatorWithSelect(childRS, context);
+ select.getConf().setOutputColumnNames(childRS.getConf().getOutputValueColumnNames());
+ select.getConf().setColList(childRS.getConf().getValueCols());
+ return select;
+ }
- return true;
+ private SelectOperator replaceOperatorWithSelect(Operator<?> operator, ParseContext context)
+ throws SemanticException {
+ RowResolver inputRR = context.getOpParseCtx().get(operator).getRowResolver();
+ SelectDesc select = new SelectDesc(null, null);
+
+ Operator<?> parent = getSingleParent(operator);
+ Operator<?> child = getSingleChild(operator);
+
+ parent.getChildOperators().clear();
+
+ SelectOperator sel = (SelectOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+ .getColumnInfos()), parent), inputRR, context);
+
+ sel.setColumnExprMap(operator.getColumnExprMap());
+
+ sel.setChildOperators(operator.getChildOperators());
+ for (Operator<? extends Serializable> ch : operator.getChildOperators()) {
+ ch.replaceParent(operator, sel);
+ }
+ if (child instanceof ExtractOperator) {
+ removeOperator(child, getSingleChild(child), sel, context);
+ dedupCtx.addRemovedOperator(child);
}
+ operator.setChildOperators(null);
+ operator.setParentOperators(null);
+ dedupCtx.addRemovedOperator(operator);
+ return sel;
+ }
- private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
- HashMap<String, String> parentColumnMapping,
- ArrayList<ExprNodeDesc> childColExprs,
- ArrayList<ExprNodeDesc> parentColExprs) {
+ protected void removeReduceSinkForGroupBy(ReduceSinkOperator cRS, GroupByOperator cGBYr,
+ ParseContext context) throws SemanticException {
- boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
- boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
+ Operator<?> parent = getSingleParent(cRS);
- if (childEmpty) { //both empty
- return true;
- }
+ if (parent instanceof GroupByOperator) {
+ GroupByOperator cGBYm = (GroupByOperator) parent;
- //child not empty here
- if (parentEmpty) { // child not empty, but parent empty
- return false;
+ cGBYr.getConf().setKeys(cGBYm.getConf().getKeys());
+ cGBYr.getConf().setAggregators(cGBYm.getConf().getAggregators());
+ for (AggregationDesc aggr : cGBYm.getConf().getAggregators()) {
+ aggr.setMode(GenericUDAFEvaluator.Mode.COMPLETE);
}
-
- if (childColExprs.size() != parentColExprs.size()) {
- return false;
+ cGBYr.setColumnExprMap(cGBYm.getColumnExprMap());
+ cGBYr.setSchema(cGBYm.getSchema());
+ RowResolver resolver = context.getOpParseCtx().get(cGBYm).getRowResolver();
+ context.getOpParseCtx().get(cGBYr).setRowResolver(resolver);
+ } else {
+ cGBYr.getConf().setKeys(ExprNodeDescUtils.backtrack(cGBYr.getConf().getKeys(), cGBYr, cRS));
+ for (AggregationDesc aggr : cGBYr.getConf().getAggregators()) {
+ aggr.setParameters(ExprNodeDescUtils.backtrack(aggr.getParameters(), cGBYr, cRS));
}
- int i = 0;
- while (i < childColExprs.size()) {
- ExprNodeDesc childExpr = childColExprs.get(i);
- ExprNodeDesc parentExpr = parentColExprs.get(i);
-
- if ((childExpr instanceof ExprNodeColumnDesc)
- && (parentExpr instanceof ExprNodeColumnDesc)) {
- String childCol = childColumnMapping
- .get(((ExprNodeColumnDesc) childExpr).getColumn());
- String parentCol = parentColumnMapping
- .get(((ExprNodeColumnDesc) childExpr).getColumn());
- if (!childCol.equals(parentCol)) {
- return false;
- }
- } else {
- return false;
+ Map<String, ExprNodeDesc> oldMap = cGBYr.getColumnExprMap();
+ RowResolver oldRR = context.getOpParseCtx().get(cGBYr).getRowResolver();
+
+ Map<String, ExprNodeDesc> newMap = new HashMap<String, ExprNodeDesc>();
+ RowResolver newRR = new RowResolver();
+
+ List<String> outputCols = cGBYr.getConf().getOutputColumnNames();
+ for (int i = 0; i < outputCols.size(); i++) {
+ String colName = outputCols.get(i);
+ String[] nm = oldRR.reverseLookup(colName);
+ ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
+ newRR.put(nm[0], nm[1], colInfo);
+ ExprNodeDesc colExpr = ExprNodeDescUtils.backtrack(oldMap.get(colName), cGBYr, cRS);
+ if (colExpr != null) {
+ newMap.put(colInfo.getInternalName(), colExpr);
}
- i++;
}
- return true;
+ cGBYr.setColumnExprMap(newMap);
+ cGBYr.setSchema(new RowSchema(newRR.getColumnInfos()));
+ context.getOpParseCtx().get(cGBYr).setRowResolver(newRR);
}
+ cGBYr.getConf().setMode(GroupByDesc.Mode.COMPLETE);
- /*
- * back track column names to find their corresponding original column
- * names. Only allow simple operators like 'select column' or filter.
- */
- private boolean backTrackColumnNames(
- HashMap<String, String> columnMapping,
- ReduceSinkOperator reduceSink,
- Operator<? extends OperatorDesc> stopBacktrackFlagOp,
- ParseContext pGraphContext) {
- Operator<? extends OperatorDesc> startOperator = reduceSink;
- while (startOperator != null && startOperator != stopBacktrackFlagOp) {
- startOperator = startOperator.getParentOperators().get(0);
- Map<String, ExprNodeDesc> colExprMap = startOperator.getColumnExprMap();
- if(colExprMap == null || colExprMap.size()==0) {
- continue;
- }
- Iterator<String> keyIter = columnMapping.keySet().iterator();
- while (keyIter.hasNext()) {
- String key = keyIter.next();
- String oldCol = columnMapping.get(key);
- ExprNodeDesc exprNode = colExprMap.get(oldCol);
- if(exprNode instanceof ExprNodeColumnDesc) {
- String col = ((ExprNodeColumnDesc)exprNode).getColumn();
- columnMapping.put(key, col);
- } else {
- return false;
- }
- }
- }
+ removeOperator(cRS, cGBYr, parent, context);
+ dedupCtx.addRemovedOperator(cRS);
+
+ if (parent instanceof GroupByOperator) {
+ removeOperator(parent, cGBYr, getSingleParent(parent), context);
+ dedupCtx.addRemovedOperator(cGBYr);
+ }
+ }
+
+ private void removeOperator(Operator<?> target, Operator<?> child, Operator<?> parent,
+ ParseContext context) {
+ for (Operator<?> aparent : target.getParentOperators()) {
+ aparent.replaceChild(target, child);
+ }
+ for (Operator<?> achild : target.getChildOperators()) {
+ achild.replaceParent(target, parent);
+ }
+ target.setChildOperators(null);
+ target.setParentOperators(null);
+ context.getOpParseCtx().remove(target);
+ }
+
+ private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+ ParseContext context) {
+ OpParseContext ctx = new OpParseContext(rr);
+ context.getOpParseCtx().put(op, ctx);
+ return op;
+ }
+ }
+ static class GroupbyReducerProc extends AbsctractReducerReducerProc {
+
+ // pRS-pGBY-cRS
+ public Object process(ReduceSinkOperator cRS, ParseContext context)
+ throws SemanticException {
+ GroupByOperator pGBY = findPossibleParent(cRS, GroupByOperator.class, trustScript());
+ if (pGBY == null) {
+ return false;
+ }
+ ReduceSinkOperator pRS = findPossibleParent(pGBY, ReduceSinkOperator.class, trustScript());
+ if (pRS != null && merge(cRS, pRS, minReducer())) {
+ replaceReduceSinkWithSelectOperator(cRS, context);
return true;
}
+ return false;
+ }
- private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
- HashMap<String, String> columnMapping = new HashMap<String, String> ();
- ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
- ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
- ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
- if(partitionCols != null) {
- for (ExprNodeDesc desc : partitionCols) {
- List<String> cols = desc.getCols();
- if ( cols != null ) {
- for(String col : cols) {
- columnMapping.put(col, col);
- }
- }
- }
- }
- if(reduceKeyCols != null) {
- for (ExprNodeDesc desc : reduceKeyCols) {
- List<String> cols = desc.getCols();
- if ( cols != null ) {
- for(String col : cols) {
- columnMapping.put(col, col);
- }
- }
- }
- }
- return columnMapping;
+ // pRS-pGBY-cRS-cGBY
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+ throws SemanticException {
+ Operator<?> start = getStartForGroupBy(cRS);
+ GroupByOperator pGBY = findPossibleParent(start, GroupByOperator.class, trustScript());
+ if (pGBY == null) {
+ return false;
}
+ ReduceSinkOperator pRS = getSingleParent(pGBY, ReduceSinkOperator.class);
+ if (pRS != null && merge(cRS, pRS, minReducer())) {
+ removeReduceSinkForGroupBy(cRS, cGBY, context);
+ return true;
+ }
+ return false;
+ }
+ }
- private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) {
- Operator<? extends OperatorDesc> start = childReduceSink;
- while(start != null) {
- if (start.getParentOperators() == null
- || start.getParentOperators().size() != 1) {
- // this potentially is a join operator
- return null;
- }
+ static class JoinReducerProc extends AbsctractReducerReducerProc {
- boolean allowed = false;
- if ((start instanceof SelectOperator)
- || (start instanceof FilterOperator)
- || (start instanceof ExtractOperator)
- || (start instanceof ForwardOperator)
- || (start instanceof ScriptOperator)
- || (start instanceof ReduceSinkOperator)) {
- allowed = true;
- }
+ // pRS-pJOIN-cRS
+ public Object process(ReduceSinkOperator cRS, ParseContext context)
+ throws SemanticException {
+ JoinOperator pJoin = findPossibleParent(cRS, JoinOperator.class, trustScript());
+ if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+ pJoin.getConf().setFixedAsSorted(true);
+ replaceReduceSinkWithSelectOperator(cRS, context);
+ return true;
+ }
+ return false;
+ }
- if (!allowed) {
- return null;
- }
+ // pRS-pJOIN-cRS-cGBY
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+ throws SemanticException {
+ Operator<?> start = getStartForGroupBy(cRS);
+ JoinOperator pJoin = findPossibleParent(start, JoinOperator.class, trustScript());
+ if (pJoin != null && merge(cRS, pJoin, minReducer())) {
+ pJoin.getConf().setFixedAsSorted(true);
+ removeReduceSinkForGroupBy(cRS, cGBY, context);
+ return true;
+ }
+ return false;
+ }
+ }
- if ((start instanceof ScriptOperator)
- && !HiveConf.getBoolVar(pGraphContext.getConf(),
- HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
- return null;
- }
+ static class ReducerReducerProc extends AbsctractReducerReducerProc {
- start = start.getParentOperators().get(0);
- if(start instanceof ReduceSinkOperator) {
- return (ReduceSinkOperator)start;
- }
- }
- return null;
+ // pRS-cRS
+ public Object process(ReduceSinkOperator cRS, ParseContext context)
+ throws SemanticException {
+ ReduceSinkOperator pRS = findPossibleParent(cRS, ReduceSinkOperator.class, trustScript());
+ if (pRS != null && merge(cRS, pRS, minReducer())) {
+ replaceReduceSinkWithSelectOperator(cRS, context);
+ return true;
}
+ return false;
}
+ // pRS-cRS-cGBY
+ public Object process(ReduceSinkOperator cRS, GroupByOperator cGBY, ParseContext context)
+ throws SemanticException {
+ Operator<?> start = getStartForGroupBy(cRS);
+ ReduceSinkOperator pRS = findPossibleParent(start, ReduceSinkOperator.class, trustScript());
+ if (pRS != null && merge(cRS, pRS, minReducer())) {
+ removeReduceSinkForGroupBy(cRS, cGBY, context);
+ return true;
+ }
+ return false;
+ }
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Wed Apr 17 07:29:38 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -290,12 +291,13 @@ public class ListBucketingPruner impleme
List<List<String>> uniqSkewedValues) throws SemanticException {
// For each entry in dynamic-multi-dimension collection.
List<String> skewedCols = part.getSkewedColNames(); // Retrieve skewed column.
- Map<List<String>, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
+ Map<SkewedValueList, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
// map.
assert ListBucketingPrunerUtils.isListBucketingPart(part) : part.getName()
+ " skewed metadata is corrupted. No skewed column and/or location mappings information.";
List<List<String>> skewedValues = part.getSkewedColValues();
List<Boolean> nonSkewedValueMatchResult = new ArrayList<Boolean>();
+ SkewedValueList skewedValueList = new SkewedValueList();
for (List<String> cell : collections) {
// Walk through the tree to decide value.
// Example: skewed column: C1, C2 ;
@@ -309,8 +311,9 @@ public class ListBucketingPruner impleme
/* It's valid case if a partition: */
/* 1. is defined with skewed columns and skewed values in metadata */
/* 2. doesn't have all skewed values within its data */
- if (mappings.get(cell) != null) {
- selectedPaths.add(new Path(mappings.get(cell)));
+ skewedValueList.setSkewedValueList(cell);
+ if (mappings.get(skewedValueList) != null) {
+ selectedPaths.add(new Path(mappings.get(skewedValueList)));
}
}
} else {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Wed Apr 17 07:29:38 2013
@@ -120,6 +120,8 @@ public class CommonJoinResolver implemen
*/
class CommonJoinTaskDispatcher implements Dispatcher {
+ HashMap<String, Long> aliasToSize = null;
+
private final PhysicalContext physicalContext;
public CommonJoinTaskDispatcher(PhysicalContext context) {
@@ -145,7 +147,7 @@ public class CommonJoinResolver implemen
* A task and its child task has been converted from join to mapjoin.
* See if the two tasks can be merged.
*/
- private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) {
+ private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
MapredWork work = task.getWork();
MapredLocalWork localWork = work.getMapLocalWork();
@@ -194,6 +196,33 @@ public class CommonJoinResolver implemen
if (childWork.getAliasToWork().size() > 1) {
return;
}
+ long mapJoinSize = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ long localTableTotalSize = 0;
+ for (String alias : localWork.getAliasToWork().keySet()) {
+ Long tabSize = aliasToSize.get(alias);
+ if (tabSize == null) {
+ /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+ * this implies that merge cannot happen so we can return.
+ */
+ return;
+ }
+ localTableTotalSize += tabSize;
+ }
+
+ for (String alias : childLocalWork.getAliasToWork().keySet()) {
+ Long tabSize = aliasToSize.get(alias);
+ if (tabSize == null) {
+ /* if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
+ * this implies that merge cannot happen so we can return.
+ */
+ return;
+ }
+ localTableTotalSize += tabSize;
+ if (localTableTotalSize > mapJoinSize) {
+ return;
+ }
+ }
Operator<? extends Serializable> childAliasOp =
childWork.getAliasToWork().values().iterator().next();
@@ -234,11 +263,8 @@ public class CommonJoinResolver implemen
}
// create map join task and set big table as bigTablePosition
- private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(String xml,
- int bigTablePosition) throws UnsupportedEncodingException, SemanticException {
- // deep copy a new mapred work from xml
- InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+ private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(MapredWork newWork,
+ int bigTablePosition) throws SemanticException {
// create a mapred task for this work
MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
.getParseContext().getConf());
@@ -256,7 +282,7 @@ public class CommonJoinResolver implemen
// whether it contains common join op; if contains, return this common join op
JoinOperator joinOp = getJoinOp(currTask);
- if (joinOp == null) {
+ if (joinOp == null || joinOp.getConf().isFixedAsSorted()) {
return null;
}
currTask.setTaskTag(Task.COMMON_JOIN);
@@ -282,7 +308,10 @@ public class CommonJoinResolver implemen
int numAliases = order.length;
long aliasTotalKnownInputSize = 0;
- HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
+
+ if (aliasToSize == null) {
+ aliasToSize = new HashMap<String, Long>();
+ }
try {
// go over all the input paths, and calculate a known total size, known
// size for each input alias.
@@ -369,11 +398,10 @@ public class CommonJoinResolver implemen
String bigTableAlias = null;
currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
currWork.setJoinTree(joinTree);
- String xml = currWork.toXML();
if (convertJoinMapJoin) {
// create map join task and set big table as bigTablePosition
- MapRedTask newTask = convertTaskToMapJoinTask(xml, bigTablePosition).getFirst();
+ MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
replaceTask(currTask, newTask, physicalContext);
@@ -384,7 +412,7 @@ public class CommonJoinResolver implemen
// followed by a mapjoin can be performed in a single MR job.
if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
&& (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
- mergeMapJoinTaskWithChildMapJoinTask(newTask);
+ mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
}
return newTask;
@@ -392,14 +420,19 @@ public class CommonJoinResolver implemen
long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
+ String xml = currWork.toXML();
for (int i = 0; i < numAliases; i++) {
// this table cannot be big table
if (!bigTableCandidates.contains(i)) {
continue;
}
+ // deep copy a new mapred work from xml
+ InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
+ MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+
// create map join task and set big table as i
- ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(xml, i);
+ ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
MapRedTask newTask = newTaskAlias.getFirst();
bigTableAlias = newTaskAlias.getSecond();
@@ -502,10 +535,10 @@ public class CommonJoinResolver implemen
currTask.setParentTasks(null);
if (parentTasks != null) {
for (Task<? extends Serializable> tsk : parentTasks) {
- // make new generated task depends on all the parent tasks of current task.
- tsk.addDependentTask(newTask);
// remove the current task from its original parent task's dependent task
tsk.removeDependentTask(currTask);
+ // make new generated task depends on all the parent tasks of current task.
+ tsk.addDependentTask(newTask);
}
} else {
// remove from current root task and add conditional task to root tasks
@@ -518,10 +551,10 @@ public class CommonJoinResolver implemen
currTask.setChildTasks(null);
if (oldChildTasks != null) {
for (Task<? extends Serializable> tsk : oldChildTasks) {
- // make new generated task depends on all the parent tasks of current task.
- newTask.addDependentTask(tsk);
// remove the current task from its original parent task's dependent task
tsk.getParentTasks().remove(currTask);
+ // make new generated task depends on all the parent tasks of current task.
+ newTask.addDependentTask(tsk);
}
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinProcFactory.java Wed Apr 17 07:29:38 2013
@@ -52,6 +52,9 @@ public final class SkewJoinProcFactory {
Object... nodeOutputs) throws SemanticException {
SkewJoinProcCtx context = (SkewJoinProcCtx) ctx;
JoinOperator op = (JoinOperator) nd;
+ if (op.getConf().isFixedAsSorted()) {
+ return null;
+ }
ParseContext parseContext = context.getParseCtx();
Task<? extends Serializable> currentTsk = context.getCurrentTask();
GenMRSkewJoinProcessor.processSkewJoin(op, currentTsk, parseContext);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Wed Apr 17 07:29:38 2013
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
@@ -965,7 +966,7 @@ public abstract class BaseSemanticAnalyz
* @return
*/
protected ListBucketingCtx constructListBucketingCtx(List<String> skewedColNames,
- List<List<String>> skewedValues, Map<List<String>, String> skewedColValueLocationMaps,
+ List<List<String>> skewedValues, Map<SkewedValueList, String> skewedColValueLocationMaps,
boolean isStoredAsSubDirectories, HiveConf conf) {
ListBucketingCtx lbCtx = new ListBucketingCtx();
lbCtx.setSkewedColNames(skewedColNames);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Wed Apr 17 07:29:38 2013
@@ -1828,6 +1828,7 @@ body
:
insertClause
selectClause
+ lateralView?
whereClause?
groupByClause?
havingClause?
@@ -1836,11 +1837,12 @@ body
distributeByClause?
sortByClause?
window_clause?
- limitClause? -> ^(TOK_INSERT insertClause?
- selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+ limitClause? -> ^(TOK_INSERT insertClause
+ selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
distributeByClause? sortByClause? window_clause? limitClause?)
|
selectClause
+ lateralView?
whereClause?
groupByClause?
havingClause?
@@ -1850,7 +1852,7 @@ body
sortByClause?
window_clause?
limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
- selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+ selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
distributeByClause? sortByClause? window_clause? limitClause?)
;
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Wed Apr 17 07:29:38 2013
@@ -909,7 +909,7 @@ public class PTFTranslator {
}
@SuppressWarnings({"unchecked"})
- private static void addOIPropertiestoSerDePropsMap(StructObjectInspector OI,
+ public static void addOIPropertiestoSerDePropsMap(StructObjectInspector OI,
Map<String,String> serdePropsMap) {
if ( serdePropsMap == null ) {
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Wed Apr 17 07:29:38 2013
@@ -90,6 +90,8 @@ public class QBParseInfo {
*/
private final HashMap<String, ArrayList<ASTNode>> aliasToLateralViews;
+ private final HashMap<String, ASTNode> destToLateralView;
+
/* Order by clause */
private final HashMap<String, ASTNode> destToOrderby;
private final HashMap<String, Integer> destToLimit;
@@ -111,6 +113,7 @@ public class QBParseInfo {
nameToDest = new HashMap<String, ASTNode>();
nameToSample = new HashMap<String, TableSample>();
exprToColumnAlias = new HashMap<ASTNode, String>();
+ destToLateralView = new HashMap<String, ASTNode>();
destToSelExpr = new LinkedHashMap<String, ASTNode>();
destToWhereExpr = new HashMap<String, ASTNode>();
destToGroupby = new HashMap<String, ASTNode>();
@@ -552,6 +555,9 @@ public class QBParseInfo {
return nameToSample;
}
+ public HashMap<String, ASTNode> getDestToLateralView() {
+ return destToLateralView;
+ }
protected static enum ClauseType {
CLUSTER_BY_CLAUSE,
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Wed Apr 17 07:29:38 2013
@@ -1023,7 +1023,13 @@ public class SemanticAnalyzer extends Ba
.getMsg(partition.toString()));
}
}
-
+ skipRecursion = false;
+ break;
+ case HiveParser.TOK_LATERAL_VIEW:
+ // todo: nested LV
+ assert ast.getChildCount() == 1;
+ qb.getParseInfo().getDestToLateralView().put(ctx_1.dest, ast);
+ break;
default:
skipRecursion = false;
break;
@@ -3989,7 +3995,7 @@ public class SemanticAnalyzer extends Ba
}
@SuppressWarnings({"nls"})
- private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+ private Operator genGroupByPlan1ReduceMultiGBY(List<String> dests, QB qb, Operator input)
throws SemanticException {
QBParseInfo parseInfo = qb.getParseInfo();
@@ -6811,9 +6817,14 @@ public class SemanticAnalyzer extends Ba
// Return the common distinct expression
// There should be more than 1 destination, with group bys in all of them.
private List<ASTNode> getCommonDistinctExprs(QB qb, Operator input) {
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBParseInfo qbp = qb.getParseInfo();
+ // If a grouping set aggregation is present, common processing is not possible
+ if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()
+ || !qbp.getDestToLateralView().isEmpty()) {
+ return null;
+ }
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
TreeSet<String> ks = new TreeSet<String>();
ks.addAll(qbp.getClauseNames());
@@ -6822,15 +6833,10 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
+ List<ExprNodeDesc> oldList = null;
List<ASTNode> oldASTList = null;
for (String dest : ks) {
- // If a grouping set aggregation is present, common processing is not possible
- if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) {
- return null;
- }
-
// If a filter is present, common processing is not possible
if (qbp.getWhrForClause(dest) != null) {
return null;
@@ -6847,7 +6853,7 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+ List<ExprNodeDesc> currDestList;
try {
currDestList = getDistinctExprs(qbp, dest, inputRR);
} catch (SemanticException e) {
@@ -6968,10 +6974,9 @@ public class SemanticAnalyzer extends Ba
// Groups the clause names into lists so that any two clauses in the same list has the same
// group by and distinct keys and no clause appears in more than one list. Returns a list of the
// lists of clauses.
- private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
- throws SemanticException {
+ private List<List<String>> getCommonGroupByDestGroups(QB qb,
+ Map<String, Operator<? extends OperatorDesc>> inputs) throws SemanticException {
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBParseInfo qbp = qb.getParseInfo();
TreeSet<String> ks = new TreeSet<String>();
@@ -6989,29 +6994,31 @@ public class SemanticAnalyzer extends Ba
return commonGroupByDestGroups;
}
- List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
- new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
+ List<Operator<? extends OperatorDesc>> inputOperators =
+ new ArrayList<Operator<? extends OperatorDesc>>(ks.size());
+ List<List<ExprNodeDesc>> sprayKeyLists = new ArrayList<List<ExprNodeDesc>>(ks.size());
// Iterate over each clause
for (String dest : ks) {
-
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
- getDistinctExprs(qbp, dest, inputRR);
+ Operator input = inputs.get(dest);
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+ List<ExprNodeDesc> sprayKeys = getDistinctExprs(qbp, dest, inputRR);
// Add the group by expressions
List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
for (ASTNode grpByExpr : grpByExprs) {
- ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
- new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
- if (!sprayKeys.contains(grpByExprWrapper)) {
- sprayKeys.add(grpByExprWrapper);
+ ExprNodeDesc exprDesc = genExprNodeDesc(grpByExpr, inputRR);
+ if (ExprNodeDescUtils.indexOf(exprDesc, sprayKeys) < 0) {
+ sprayKeys.add(exprDesc);
}
}
// Loop through each of the lists of exprs, looking for a match
boolean found = false;
for (int i = 0; i < sprayKeyLists.size(); i++) {
-
+ if (!input.equals(inputOperators.get(i))) {
+ continue;
+ }
if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
continue;
}
@@ -7024,6 +7031,7 @@ public class SemanticAnalyzer extends Ba
// No match was found, so create new entries
if (!found) {
+ inputOperators.add(input);
sprayKeyLists.add(sprayKeys);
List<String> destGroup = new ArrayList<String>();
destGroup.add(dest);
@@ -7035,15 +7043,13 @@ public class SemanticAnalyzer extends Ba
}
// Returns whether or not two lists contain the same elements independent of order
- private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
+ private boolean matchExprLists(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
if (list1.size() != list2.size()) {
return false;
}
-
- for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
- if (!list2.contains(exprNodeDesc)) {
+ for (ExprNodeDesc exprNodeDesc : list1) {
+ if (ExprNodeDescUtils.indexOf(exprNodeDesc, list2) < 0) {
return false;
}
}
@@ -7051,23 +7057,20 @@ public class SemanticAnalyzer extends Ba
return true;
}
- // Returns a list of the distinct exprs for a given clause name as
- // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
- private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
- getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+ // Returns a list of the distinct exprs without duplicates for a given clause name
+ private List<ExprNodeDesc> getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR)
+ throws SemanticException {
List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
- List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
- new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+ List<ExprNodeDesc> distinctExprs = new ArrayList<ExprNodeDesc>();
for (ASTNode distinctAggExpr : distinctAggExprs) {
// 0 is function name
for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
- ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
- new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
- if (!distinctExprs.contains(distinctExpr)) {
- distinctExprs.add(distinctExpr);
+ ExprNodeDesc expr = genExprNodeDesc(parameter, inputRR);
+ if (ExprNodeDescUtils.indexOf(expr, distinctExprs) < 0) {
+ distinctExprs.add(expr);
}
}
}
@@ -7096,6 +7099,7 @@ public class SemanticAnalyzer extends Ba
QBParseInfo qbp = qb.getParseInfo();
TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
+ Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);
// For multi-group by with the same distinct, we ignore all user hints
// currently. It doesnt matter whether he has asked to do
// map-side aggregation or not. Map side aggregation is turned off
@@ -7148,7 +7152,7 @@ public class SemanticAnalyzer extends Ba
// expressions, otherwise treat all the expressions as a single group
if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
try {
- commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+ commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
} catch (SemanticException e) {
LOG.error("Failed to group clauses by common spray keys.", e);
}
@@ -7168,6 +7172,8 @@ public class SemanticAnalyzer extends Ba
}
String firstDest = commonGroupByDestGroup.get(0);
+ input = inputs.get(firstDest);
+
// Constructs a standard group by plan if:
// There is no other subquery with the same group by/distinct keys or
// (There are no aggregations in a representative query for the group and
@@ -7182,7 +7188,7 @@ public class SemanticAnalyzer extends Ba
// Go over all the destination tables
for (String dest : commonGroupByDestGroup) {
- curr = input;
+ curr = inputs.get(dest);
if (qbp.getWhrForClause(dest) != null) {
curr = genFilterPlan(dest, qb, curr);
@@ -7215,7 +7221,7 @@ public class SemanticAnalyzer extends Ba
curr = genPostGroupByBodyPlan(curr, dest, qb);
}
} else {
- curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
+ curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input);
}
}
}
@@ -7228,6 +7234,16 @@ public class SemanticAnalyzer extends Ba
return curr;
}
+ private Map<String, Operator<? extends OperatorDesc>> createInputForDests(QB qb,
+ Operator<? extends OperatorDesc> input, Set<String> dests) throws SemanticException {
+ Map<String, Operator<? extends OperatorDesc>> inputs =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+ for (String dest : dests) {
+ inputs.put(dest, genLateralViewPlanForDest(dest, qb, input));
+ }
+ return inputs;
+ }
+
private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
throws SemanticException {
@@ -7308,7 +7324,9 @@ public class SemanticAnalyzer extends Ba
extraMRStep);
qb.getParseInfo().setOuterQueryLimit(limit.intValue());
}
- curr = genFileSinkPlan(dest, qb, curr);
+ if (!SessionState.get().getHiveOperation().equals(HiveOperation.CREATEVIEW)) {
+ curr = genFileSinkPlan(dest, qb, curr);
+ }
}
// change curr ops row resolver's tab aliases to query alias if it
@@ -8035,71 +8053,7 @@ public class SemanticAnalyzer extends Ba
// -> LateralViewJoinOperator
//
- RowResolver lvForwardRR = new RowResolver();
- RowResolver source = opParseCtx.get(op).getRowResolver();
- for (ColumnInfo col : source.getColumnInfos()) {
- if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
- continue;
- }
- String[] tabCol = source.reverseLookup(col.getInternalName());
- lvForwardRR.put(tabCol[0], tabCol[1], col);
- }
-
- Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
- op), lvForwardRR);
-
- // The order in which the two paths are added is important. The
- // lateral view join operator depends on having the select operator
- // give it the row first.
-
- // Get the all path by making a select(*).
- RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
- // Operator allPath = op;
- Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
- lvForward), allPathRR);
- // Get the UDTF Path
- QB blankQb = new QB(null, null, false);
- Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
- .getChild(0), blankQb, lvForward);
- // add udtf aliases to QB
- for (String udtfAlias : blankQb.getAliases()) {
- qb.addAlias(udtfAlias);
- }
- RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
-
- // Merge the two into the lateral view join
- // The cols of the merged result will be the combination of both the
- // cols of the UDTF path and the cols of the all path. The internal
- // names have to be changed to avoid conflicts
-
- RowResolver lateralViewRR = new RowResolver();
- ArrayList<String> outputInternalColNames = new ArrayList<String>();
-
- LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
- LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
-
- // For PPD, we need a column to expression map so that during the walk,
- // the processor knows how to transform the internal col names.
- // Following steps are dependant on the fact that we called
- // LVmerge.. in the above order
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-
- int i = 0;
- for (ColumnInfo c : allPathRR.getColumnInfos()) {
- String internalName = getColumnInternalName(i);
- i++;
- colExprMap.put(internalName,
- new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
- c.getTabAlias(), c.getIsVirtualCol()));
- }
-
- Operator lateralViewJoin = putOpInsertMap(OperatorFactory
- .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
- new RowSchema(lateralViewRR.getColumnInfos()), allPath,
- udtfPath), lateralViewRR);
- lateralViewJoin.setColumnExprMap(colExprMap);
+ Operator lateralViewJoin = genLateralViewPlan(qb, op, lateralViewTree);
op = lateralViewJoin;
}
e.setValue(op);
@@ -8107,6 +8061,85 @@ public class SemanticAnalyzer extends Ba
}
}
+ private Operator genLateralViewPlanForDest(String dest, QB qb, Operator op)
+ throws SemanticException {
+ ASTNode lateralViewTree = qb.getParseInfo().getDestToLateralView().get(dest);
+ if (lateralViewTree != null) {
+ return genLateralViewPlan(qb, op, lateralViewTree);
+ }
+ return op;
+ }
+
+ private Operator genLateralViewPlan(QB qb, Operator op, ASTNode lateralViewTree)
+ throws SemanticException {
+ RowResolver lvForwardRR = new RowResolver();
+ RowResolver source = opParseCtx.get(op).getRowResolver();
+ for (ColumnInfo col : source.getColumnInfos()) {
+ if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
+ continue;
+ }
+ String[] tabCol = source.reverseLookup(col.getInternalName());
+ lvForwardRR.put(tabCol[0], tabCol[1], col);
+ }
+
+ Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
+ op), lvForwardRR);
+
+ // The order in which the two paths are added is important. The
+ // lateral view join operator depends on having the select operator
+ // give it the row first.
+
+ // Get the all path by making a select(*).
+ RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
+ // Operator allPath = op;
+ Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
+ lvForward), allPathRR);
+ // Get the UDTF Path
+ QB blankQb = new QB(null, null, false);
+ Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
+ .getChild(0), blankQb, lvForward);
+ // add udtf aliases to QB
+ for (String udtfAlias : blankQb.getAliases()) {
+ qb.addAlias(udtfAlias);
+ }
+ RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
+
+ // Merge the two into the lateral view join
+ // The cols of the merged result will be the combination of both the
+ // cols of the UDTF path and the cols of the all path. The internal
+ // names have to be changed to avoid conflicts
+
+ RowResolver lateralViewRR = new RowResolver();
+ ArrayList<String> outputInternalColNames = new ArrayList<String>();
+
+ LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
+ LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
+
+ // For PPD, we need a column to expression map so that during the walk,
+ // the processor knows how to transform the internal col names.
+ // Following steps are dependant on the fact that we called
+ // LVmerge.. in the above order
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+ int i = 0;
+ for (ColumnInfo c : allPathRR.getColumnInfos()) {
+ String internalName = getColumnInternalName(i);
+ i++;
+ colExprMap.put(internalName,
+ new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
+ c.getTabAlias(), c.getIsVirtualCol()));
+ }
+
+ Operator lateralViewJoin = putOpInsertMap(OperatorFactory
+ .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
+ new RowSchema(lateralViewRR.getColumnInfos()), allPath,
+ udtfPath), lateralViewRR);
+ lateralViewJoin.setColumnExprMap(colExprMap);
+ return lateralViewJoin;
+ }
+
/**
* A helper function that gets all the columns and respective aliases in the
* source and puts them into dest. It renames the internal names of the
@@ -8702,7 +8735,7 @@ public class SemanticAnalyzer extends Ba
}
if (LOG.isDebugEnabled()) {
- LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+ LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
Optimizer optm = new Optimizer();
@@ -8711,7 +8744,7 @@ public class SemanticAnalyzer extends Ba
pCtx = optm.optimize();
if (LOG.isDebugEnabled()) {
- LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+ LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
// Generate column access stats if required - wait until column pruning takes place
@@ -10486,6 +10519,7 @@ public class SemanticAnalyzer extends Ba
Map<String, ExprNodeDesc> colExprMap,
List<String> outputColumnNames,
StringBuilder orderString,
+ RowResolver rsOpRR,
RowResolver extractRR) throws SemanticException {
ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
@@ -10515,16 +10549,12 @@ public class SemanticAnalyzer extends Ba
orderCols.add(colDef.getExprNode());
}
+ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
/*
- * We add the column to value columns or output column names
- * only if it is not a virtual column
+ * construct the ReduceSinkRR
*/
- ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
- LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
- new LinkedHashMap<String[], ColumnInfo>();
int pos = 0;
for (ColumnInfo colInfo : colInfoList) {
- if (!colInfo.isHiddenVirtualCol()) {
ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
.getInternalName(), colInfo.getTabAlias(), colInfo
.getIsVirtualCol());
@@ -10534,32 +10564,45 @@ public class SemanticAnalyzer extends Ba
outputColumnNames.add(outColName);
String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
- /*
- * if we have already encountered this colInfo internalName.
- * We encounter it again because it must be put for the Having clause.
- * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
- */
- if ( colsAddedByHaving.containsKey(alias)) {
- continue;
- }
- ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
- ColumnInfo eColInfo = new ColumnInfo(
+ ColumnInfo newColInfo = new ColumnInfo(
outColName, colInfo.getType(), alias[0],
colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+ rsOpRR.put(alias[0], alias[1], newColInfo);
+ }
- if ( astNode == null ) {
- extractRR.put(alias[0], alias[1], eColInfo);
- }
- else {
- /*
- * in case having clause refers to this column may have been added twice;
- * once with the ASTNode.toStringTree as the alias
- * and then with the real alias.
- */
- extractRR.putExpression(astNode, eColInfo);
- if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
- colsAddedByHaving.put(alias, eColInfo);
- }
+ /*
+ * construct the ExtractRR
+ */
+ LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+ new LinkedHashMap<String[], ColumnInfo>();
+ pos = 0;
+ for (ColumnInfo colInfo : colInfoList) {
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ /*
+ * if we have already encountered this colInfo internalName.
+ * We encounter it again because it must be put for the Having clause.
+ * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+ */
+ if ( colsAddedByHaving.containsKey(alias)) {
+ continue;
+ }
+ ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+ ColumnInfo eColInfo = new ColumnInfo(
+ SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+ if ( astNode == null ) {
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ else {
+ /*
+ * in case having clause refers to this column may have been added twice;
+ * once with the ASTNode.toStringTree as the alias
+ * and then with the real alias.
+ */
+ extractRR.putExpression(astNode, eColInfo);
+ if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+ colsAddedByHaving.put(alias, eColInfo);
}
}
}
@@ -10578,6 +10621,8 @@ public class SemanticAnalyzer extends Ba
*/
RowResolver rr = opParseCtx.get(input).getRowResolver();
PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+ RowResolver rsOpRR = new RowResolver();
/*
* Build an RR for the Extract Op from the ResuceSink Op's RR.
* Why?
@@ -10647,13 +10692,14 @@ public class SemanticAnalyzer extends Ba
colExprMap,
outputColumnNames,
orderString,
+ rsOpRR,
extractOpRR);
input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
.getReduceSinkDesc(orderCols,
valueCols, outputColumnNames, false,
-1, partCols, orderString.toString(), -1),
- new RowSchema(rr.getColumnInfos()), input), rr);
+ new RowSchema(rsOpRR.getColumnInfos()), input), rsOpRR);
input.setColumnExprMap(colExprMap);
}
@@ -10775,7 +10821,7 @@ public class SemanticAnalyzer extends Ba
.getReduceSinkDesc(orderCols,
valueCols, outputColumnNames, false,
-1, partCols, orderString.toString(), -1),
- new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+ new RowSchema(rsNewRR.getColumnInfos()), input), rsNewRR);
input.setColumnExprMap(colExprMap);
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Wed Apr 17 07:29:38 2013
@@ -237,10 +237,24 @@ public class WindowingSpec {
OrderSpec orderSpec = wdwSpec.getOrder();
if ( wFrame == null ) {
if (!supportsWindowing ) {
- wFrame = new WindowFrameSpec(
- new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
- new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
- );
+
+ if ( wFn.getName().toLowerCase().equals(FunctionRegistry.LAST_VALUE_FUNC_NAME)
+ && orderSpec != null ) {
+ /*
+ * last_value: when an Sort Key is specified, then last_value should return the
+ * last value among rows with the same Sort Key value.
+ */
+ wFrame = new WindowFrameSpec(
+ new CurrentRowSpec(),
+ new RangeBoundarySpec(Direction.FOLLOWING, 0)
+ );
+ }
+ else {
+ wFrame = new WindowFrameSpec(
+ new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+ new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+ );
+ }
}
else if ( orderSpec == null ) {
wFrame = new WindowFrameSpec(
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Wed Apr 17 07:29:38 2013
@@ -187,9 +187,10 @@ public class ExprNodeDescUtils {
return result;
}
- private static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
+ public static ExprNodeDesc backtrack(ExprNodeDesc source, Operator<?> current,
Operator<?> terminal) throws SemanticException {
- if (current == null || current == terminal) {
+ Operator<?> parent = getSingleParent(current, terminal);
+ if (parent == null) {
return source;
}
if (source instanceof ExprNodeGenericFuncDesc) {
@@ -200,7 +201,7 @@ public class ExprNodeDescUtils {
}
if (source instanceof ExprNodeColumnDesc) {
ExprNodeColumnDesc column = (ExprNodeColumnDesc) source;
- return backtrack(column, current, terminal);
+ return backtrack(column, parent, terminal);
}
if (source instanceof ExprNodeFieldDesc) {
// field epression should be resolved
@@ -215,20 +216,19 @@ public class ExprNodeDescUtils {
// Resolve column expression to input expression by using expression mapping in current operator
private static ExprNodeDesc backtrack(ExprNodeColumnDesc column, Operator<?> current,
Operator<?> terminal) throws SemanticException {
- if (current == null || current == terminal) {
- return column;
- }
- Operator<?> parent = getSingleParent(current, terminal);
Map<String, ExprNodeDesc> mapping = current.getColumnExprMap();
if (mapping == null || !mapping.containsKey(column.getColumn())) {
- return backtrack(column, parent, terminal); // forward
+ return backtrack((ExprNodeDesc)column, current, terminal);
}
ExprNodeDesc mapped = mapping.get(column.getColumn());
- return backtrack(mapped, parent, terminal); // forward with resolved expr
+ return backtrack(mapped, current, terminal);
}
public static Operator<?> getSingleParent(Operator<?> current, Operator<?> terminal)
throws SemanticException {
+ if (current == terminal) {
+ return null;
+ }
List<Operator<?>> parents = current.getParentOperators();
if (parents == null || parents.isEmpty()) {
if (terminal != null) {
@@ -236,9 +236,12 @@ public class ExprNodeDescUtils {
}
return null;
}
- if (current.getParentOperators().size() > 1) {
- throw new SemanticException("Met multiple parent operators");
+ if (parents.size() == 1) {
+ return parents.get(0);
+ }
+ if (terminal != null && parents.contains(terminal)) {
+ return terminal;
}
- return parents.get(0);
+ throw new SemanticException("Met multiple parent operators");
}
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java?rev=1468783&r1=1468782&r2=1468783&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java Wed Apr 17 07:29:38 2013
@@ -81,6 +81,10 @@ public class JoinDesc extends AbstractOp
protected Byte[] tagOrder;
private TableDesc keyTableDesc;
+ // this operator cannot be converted to mapjoin cause output is expected to be sorted on join key
+ // it's resulted from RS-dedup optimization, which removes following RS under some condition
+ private boolean fixedAsSorted;
+
public JoinDesc() {
}
@@ -525,4 +529,12 @@ public class JoinDesc extends AbstractOp
}
return result;
}
+
+ public boolean isFixedAsSorted() {
+ return fixedAsSorted;
+ }
+
+ public void setFixedAsSorted(boolean fixedAsSorted) {
+ this.fixedAsSorted = fixedAsSorted;
+ }
}