You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 03:22:02 UTC
svn commit: r1514554 [7/18] - in /hive/branches/vectorization: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
cli/src/test/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/NonBlockingOpDeDupProc.java Fri Aug 16 01:21:54 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -49,8 +50,11 @@ import org.apache.hadoop.hive.ql.plan.Ex
*/
public class NonBlockingOpDeDupProc implements Transform {
+ private ParseContext pctx;
+
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
+ this.pctx = pctx;
String SEL = SelectOperator.getOperatorName();
String FIL = FilterOperator.getOperatorName();
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
@@ -66,7 +70,7 @@ public class NonBlockingOpDeDupProc impl
return pctx;
}
- static class SelectDedup implements NodeProcessor {
+ private class SelectDedup implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
@@ -86,19 +90,42 @@ public class NonBlockingOpDeDupProc impl
Set<String> funcOutputs = getFunctionOutputs(
pSEL.getConf().getOutputColumnNames(), pSEL.getConf().getColList());
- List<ExprNodeDesc> sources = cSEL.getConf().getColList();
- if (!funcOutputs.isEmpty() && !checkReferences(sources, funcOutputs)) {
+ List<ExprNodeDesc> cSELColList = cSEL.getConf().getColList();
+ List<String> cSELOutputColumnNames = cSEL.getConf().getOutputColumnNames();
+ if (!funcOutputs.isEmpty() && !checkReferences(cSELColList, funcOutputs)) {
return null;
}
- pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(sources, cSEL, pSEL));
- pSEL.getConf().setOutputColumnNames(cSEL.getConf().getOutputColumnNames());
-
- // updates schema only (this should be the last optimizer modifying operator tree)
+ if (cSEL.getColumnExprMap() == null) {
+ // If the child SelectOperator does not have the ColumnExprMap,
+ // we do not need to update the ColumnExprMap in the parent SelectOperator.
+ pSEL.getConf().setColList(ExprNodeDescUtils.backtrack(cSELColList, cSEL, pSEL));
+ pSEL.getConf().setOutputColumnNames(cSELOutputColumnNames);
+ } else {
+ // If the child SelectOperator has the ColumnExprMap,
+ // we need to update the ColumnExprMap in the parent SelectOperator.
+ List<ExprNodeDesc> newPSELColList = new ArrayList<ExprNodeDesc>();
+ List<String> newPSELOutputColumnNames = new ArrayList<String>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ for (int i= 0; i < cSELOutputColumnNames.size(); i++) {
+ String outputColumnName = cSELOutputColumnNames.get(i);
+ ExprNodeDesc cSELExprNodeDesc = cSELColList.get(i);
+ ExprNodeDesc newPSELExprNodeDesc =
+ ExprNodeDescUtils.backtrack(cSELExprNodeDesc, cSEL, pSEL);
+ newPSELColList.add(newPSELExprNodeDesc);
+ newPSELOutputColumnNames.add(outputColumnName);
+ colExprMap.put(outputColumnName, newPSELExprNodeDesc);
+ }
+ pSEL.getConf().setColList(newPSELColList);
+ pSEL.getConf().setOutputColumnNames(newPSELOutputColumnNames);
+ pSEL.setColumnExprMap(colExprMap);
+ }
pSEL.setSchema(cSEL.getSchema());
}
pSEL.getConf().setSelectStar(cSEL.getConf().isSelectStar());
-
+ // We need to use the OpParseContext of the child SelectOperator to replace the
+ // the OpParseContext of the parent SelectOperator.
+ pctx.updateOpParseCtx(pSEL, pctx.removeOpParseCtx(cSEL));
pSEL.removeChildAndAdoptItsChildren(cSEL);
cSEL.setParentOperators(null);
cSEL.setChildOperators(null);
@@ -148,7 +175,7 @@ public class NonBlockingOpDeDupProc impl
}
}
- static class FilterDedup implements NodeProcessor {
+ private class FilterDedup implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Fri Aug 16 01:21:54 2013
@@ -142,8 +142,8 @@ public class SimpleFetchOptimizer implem
}
if (aggressive || bypassFilter) {
PrunedPartitionList pruned = pctx.getPrunedPartitions(alias, ts);
- if (aggressive || pruned.getUnknownPartns().isEmpty()) {
- bypassFilter &= pruned.getUnknownPartns().isEmpty();
+ if (aggressive || !pruned.hasUnknownPartitions()) {
+ bypassFilter &= !pruned.hasUnknownPartitions();
return checkOperators(new FetchData(pruned, splitSample), ts, aggressive, bypassFilter);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java Fri Aug 16 01:21:54 2013
@@ -60,10 +60,7 @@ implements BigTableSelectorForAutoSMJ {
}
else {
// For partitioned tables, get the size of all the partitions
- PrunedPartitionList partsList =
- PartitionPruner.prune(parseCtx.getTopToTable().get(topOp),
- parseCtx.getOpToPartPruner().get(topOp), parseCtx.getConf(),
- null, parseCtx.getPrunedPartitions());
+ PrunedPartitionList partsList = PartitionPruner.prune(topOp, parseCtx, null);
for (Partition part : partsList.getNotDeniedPartns()) {
currentSize += getSize(conf, part);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java Fri Aug 16 01:21:54 2013
@@ -55,8 +55,8 @@ public class QueryPlanTreeTransformation
throws SemanticException {
int newTag = bottomRSToNewTag.get(rsop);
int oldTag = rsop.getConf().getTag();
- // if this child of dispatcher does not use tag, we just set the oldTag to 0;
if (oldTag == -1) {
+ // if this child of DemuxOperator does not use tag, we just set the oldTag to 0.
oldTag = 0;
}
Operator<? extends OperatorDesc> child = CorrelationUtilities.getSingleChild(rsop, true);
@@ -68,7 +68,8 @@ public class QueryPlanTreeTransformation
rsop.getConf().setTag(newTag);
}
- /** Based on the correlation, we transform the query plan tree (operator tree).
+ /**
+ * Based on the correlation, we transform the query plan tree (operator tree).
* In here, we first create DemuxOperator and all bottom ReduceSinkOperators
* (bottom means near TableScanOperaotr) in the correlation will be be
* the parents of the DemuxOperaotr. We also reassign tags to those
@@ -203,7 +204,6 @@ public class QueryPlanTreeTransformation
childOP.setParentOperators(Utilities.makeList(mux));
parentOp.setChildOperators(Utilities.makeList(mux));
} else {
- // childOp is a JoinOperator
List<Operator<? extends OperatorDesc>> parentsOfMux =
new ArrayList<Operator<? extends OperatorDesc>>();
List<Operator<? extends OperatorDesc>> siblingOPs =
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/LBPartitionProcFactory.java Fri Aug 16 01:21:54 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
/**
* Walk through top operators in tree to find all partitions.
@@ -55,25 +54,14 @@ public class LBPartitionProcFactory exte
//Run partition pruner to get partitions
ParseContext parseCtx = owc.getParseContext();
- PrunedPartitionList prunedPartList = parseCtx.getOpToPartList().get(top);
- if (prunedPartList == null) {
- // We never pruned the partition. Try to prune it.
- ExprNodeDesc ppr_pred = parseCtx.getOpToPartPruner().get(top);
- if (ppr_pred != null) {
- try {
- prunedPartList = PartitionPruner.prune(parseCtx.getTopToTable().get(top),
- ppr_pred, parseCtx.getConf(),
- (String) parseCtx.getTopOps().keySet()
- .toArray()[0], parseCtx.getPrunedPartitions());
- if (prunedPartList != null) {
- owc.getParseContext().getOpToPartList().put(top, prunedPartList);
- }
- } catch (HiveException e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- throw new SemanticException(e.getMessage(), e);
- }
- }
+ PrunedPartitionList prunedPartList;
+ try {
+ String alias = (String) parseCtx.getTopOps().keySet().toArray()[0];
+ prunedPartList = PartitionPruner.prune(top, parseCtx, alias);
+ } catch (HiveException e) {
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ throw new SemanticException(e.getMessage(), e);
}
if (prunedPartList != null) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Fri Aug 16 01:21:54 2013
@@ -61,9 +61,7 @@ public class ListBucketingPruner impleme
PrunedPartitionList partsList = ((LBOpPartitionWalkerCtx) opPartWalkerCtx).getPartitions();
if (partsList != null) {
- Set<Partition> parts = null;
- parts = partsList.getConfirmedPartns();
- parts.addAll(partsList.getUnknownPartns());
+ Set<Partition> parts = partsList.getPartitions();
if ((parts != null) && (parts.size() > 0)) {
for (Partition part : parts) {
// only process partition which is skewed and list bucketed
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcCtx.java Fri Aug 16 01:21:54 2013
@@ -22,21 +22,30 @@ import java.util.List;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
/**
* The processor context for partition condition remover. This contains
* partition pruned for the table scan and table alias.
*/
public class PcrExprProcCtx implements NodeProcessorCtx {
+
/**
* The table alias that is being currently processed.
*/
private final String tabAlias;
private final List<Partition> partList;
+ private final List<VirtualColumn> vcs;
public PcrExprProcCtx(String tabAlias, List<Partition> partList) {
+ this(tabAlias, partList, null);
+ }
+
+ public PcrExprProcCtx(String tabAlias, List<Partition> partList, List<VirtualColumn> vcs) {
+ super();
this.tabAlias = tabAlias;
this.partList = partList;
+ this.vcs = vcs;
}
public String getTabAlias() {
@@ -46,4 +55,8 @@ public class PcrExprProcCtx implements N
public List<Partition> getPartList() {
return partList;
}
+
+ public List<VirtualColumn> getVirtualColumns() {
+ return vcs;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrExprProcFactory.java Fri Aug 16 01:21:54 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartExprEvalUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -57,7 +58,8 @@ import org.apache.hadoop.hive.serde2.typ
* It also generates node by Modifying expr trees with partition conditions removed
*/
public final class PcrExprProcFactory {
- static Object evalExprWithPart(ExprNodeDesc expr, Partition p) throws SemanticException {
+ static Object evalExprWithPart(ExprNodeDesc expr, Partition p, List<VirtualColumn> vcs)
+ throws SemanticException {
StructObjectInspector rowObjectInspector;
Table tbl = p.getTable();
LinkedHashMap<String, String> partSpec = p.getSpec();
@@ -70,7 +72,7 @@ public final class PcrExprProcFactory {
}
try {
- return PartExprEvalUtils.evalExprWithPart(expr, partSpec, rowObjectInspector);
+ return PartExprEvalUtils.evalExprWithPart(expr, partSpec, vcs, rowObjectInspector);
} catch (HiveException e) {
throw new SemanticException(e);
}
@@ -323,7 +325,8 @@ public final class PcrExprProcFactory {
// a result, we update the state of the node to be TRUE of FALSE
Boolean[] results = new Boolean[ctx.getPartList().size()];
for (int i = 0; i < ctx.getPartList().size(); i++) {
- results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i));
+ results[i] = (Boolean) evalExprWithPart(fd, ctx.getPartList().get(i),
+ ctx.getVirtualColumns());
}
return getResultWrapFromResults(results, fd, nodeOutputs);
}
@@ -333,7 +336,7 @@ public final class PcrExprProcFactory {
// to be a CONSTANT node with value to be the agreed result.
Object[] results = new Object[ctx.getPartList().size()];
for (int i = 0; i < ctx.getPartList().size(); i++) {
- results[i] = evalExprWithPart(fd, ctx.getPartList().get(i));
+ results[i] = evalExprWithPart(fd, ctx.getPartList().get(i), ctx.getVirtualColumns());
}
Object result = ifResultsAgree(results);
if (result == null) {
@@ -421,17 +424,19 @@ public final class PcrExprProcFactory {
* @param tabAlias
* the table alias
* @param parts
- * the list of all pruned partitions for the
+ * the list of all pruned partitions for the table
+ * @param vcs
+ * virtual columns referenced
* @param pred
* expression tree of the target filter operator
* @return the node information of the root expression
* @throws SemanticException
*/
public static NodeInfoWrapper walkExprTree(
- String tabAlias, ArrayList<Partition> parts, ExprNodeDesc pred)
+ String tabAlias, ArrayList<Partition> parts, List<VirtualColumn> vcs, ExprNodeDesc pred)
throws SemanticException {
// Create the walker, the rules dispatcher and the context.
- PcrExprProcCtx pprCtx = new PcrExprProcCtx(tabAlias, parts);
+ PcrExprProcCtx pprCtx = new PcrExprProcCtx(tabAlias, parts, vcs);
Map<Rule, NodeProcessor> exprRules = new LinkedHashMap<Rule, NodeProcessor>();
exprRules.put(
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/pcr/PcrOpProcFactory.java Fri Aug 16 01:21:54 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -100,27 +100,15 @@ public final class PcrOpProcFactory {
}
- PrunedPartitionList prunedPartList = owc.getParseContext().getOpToPartList().get(top);
- if (prunedPartList == null) {
- // We never pruned the partition. Try to prune it.
- ExprNodeDesc ppr_pred = owc.getParseContext().getOpToPartPruner().get(top);
- if (ppr_pred == null) {
- // no partition predicate found, skip.
- return null;
- }
- try {
- prunedPartList = PartitionPruner.prune(owc.getParseContext().getTopToTable().get(top),
- ppr_pred, owc.getParseContext().getConf(),
- (String) owc.getParseContext().getTopOps().keySet()
- .toArray()[0], owc.getParseContext().getPrunedPartitions());
- if (prunedPartList != null) {
- owc.getParseContext().getOpToPartList().put(top, prunedPartList);
- }
- } catch (HiveException e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- throw new SemanticException(e.getMessage(), e);
- }
+ ParseContext pctx = owc.getParseContext();
+ PrunedPartitionList prunedPartList;
+ try {
+ String alias = (String) owc.getParseContext().getTopOps().keySet().toArray()[0];
+ prunedPartList = pctx.getPrunedPartitions(alias, top);
+ } catch (HiveException e) {
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ throw new SemanticException(e.getMessage(), e);
}
// Otherwise this is not a sampling predicate. We need to process it.
@@ -132,22 +120,16 @@ public final class PcrOpProcFactory {
return null;
}
- for (Partition p : prunedPartList.getConfirmedPartns()) {
- if (!p.getTable().isPartitioned()) {
- return null;
- }
- }
- for (Partition p : prunedPartList.getUnknownPartns()) {
+ for (Partition p : prunedPartList.getPartitions()) {
if (!p.getTable().isPartitioned()) {
return null;
}
}
- partitions.addAll(prunedPartList.getConfirmedPartns());
- partitions.addAll(prunedPartList.getUnknownPartns());
+ partitions.addAll(prunedPartList.getPartitions());
PcrExprProcFactory.NodeInfoWrapper wrapper = PcrExprProcFactory.walkExprTree(
- alias, partitions, predicate);
+ alias, partitions, top.getConf().getVirtualCols(), predicate);
if (wrapper.state == PcrExprProcFactory.WalkState.TRUE) {
owc.getOpToRemove().add(new PcrOpWalkerCtx.OpToDeleteInfo(pop, fop));
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Fri Aug 16 01:21:54 2013
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.exec.Co
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -54,7 +56,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
/*
@@ -109,182 +110,75 @@ public class CommonJoinTaskDispatcher ex
super(context);
}
- // Get the position of the big table for this join operator and the given alias
- private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
- String alias) {
- Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
-
- // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
- while ((parentOp.getChildOperators() != null) &&
- (!parentOp.getChildOperators().isEmpty())) {
- parentOp = parentOp.getChildOperators().get(0);
- }
-
- return joinOp.getParentOperators().indexOf(parentOp);
- }
-
- /*
- * A task and its child task has been converted from join to mapjoin.
- * See if the two tasks can be merged.
+ /**
+ * Calculate the total size of local tables in loclWork.
+ * @param localWork
+ * @return the total size of local tables. Or -1, if the total
+ * size is unknown.
*/
- private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
- MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
- MapWork work = task.getWork().getMapWork();
- MapredLocalWork localWork = work.getMapLocalWork();
- MapWork childWork = childTask.getWork().getMapWork();
- MapredLocalWork childLocalWork = childWork.getMapLocalWork();
-
- // Can this be merged
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
- if (aliasToWork.size() > 1) {
- return;
- }
-
- Operator<? extends OperatorDesc> op = aliasToWork.values().iterator().next();
- while (op.getChildOperators() != null) {
- // Dont perform this optimization for multi-table inserts
- if (op.getChildOperators().size() > 1) {
- return;
- }
- op = op.getChildOperators().get(0);
- }
-
- if (!(op instanceof FileSinkOperator)) {
- return;
- }
-
- FileSinkOperator fop = (FileSinkOperator) op;
- String workDir = fop.getConf().getDirName();
-
- Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
- if (childPathToAliases.size() > 1) {
- return;
- }
-
- // The filesink writes to a different directory
- if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
- return;
- }
-
- // Either of them should not be bucketed
- if ((localWork.getBucketMapjoinContext() != null) ||
- (childLocalWork.getBucketMapjoinContext() != null)) {
- return;
- }
-
- // Merge the trees
- if (childWork.getAliasToWork().size() > 1) {
- return;
- }
-
- long mapJoinSize = HiveConf.getLongVar(conf,
- HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ private long calculateLocalTableTotalSize(MapredLocalWork localWork) {
long localTableTotalSize = 0;
- for (String alias : localWork.getAliasToWork().keySet()) {
- Long tabSize = aliasToSize.get(alias);
- if (tabSize == null) {
- /*
- * if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
- * this implies that merge cannot happen so we can return.
- */
- return;
- }
- localTableTotalSize += tabSize;
+ if (localWork == null) {
+ return localTableTotalSize;
}
-
- for (String alias : childLocalWork.getAliasToWork().keySet()) {
+ for (String alias : localWork.getAliasToWork().keySet()) {
Long tabSize = aliasToSize.get(alias);
if (tabSize == null) {
- /*
- * if the size is unavailable, we need to assume a size 1 greater than mapJoinSize
- * this implies that merge cannot happen so we can return.
- */
- return;
+ // if the size is unavailable, we need to assume a size 1 greater than
+ // localTableTotalSizeLimit this implies that merge cannot happen
+ // so we will return false.
+ return -1;
}
localTableTotalSize += tabSize;
- if (localTableTotalSize > mapJoinSize) {
- return;
- }
- }
-
- // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
- // top of the second
- Operator<? extends Serializable> childAliasOp =
- childWork.getAliasToWork().values().iterator().next();
- if (fop.getParentOperators().size() > 1) {
- return;
}
- Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
- // remove the unnecessary TableScan
- if (childAliasOp instanceof TableScanOperator) {
- TableScanOperator tso = (TableScanOperator)childAliasOp;
- if (tso.getNumChild() != 1) {
- // shouldn't happen
- return;
- }
- childAliasOp = tso.getChildOperators().get(0);
- childAliasOp.replaceParent(tso, parentFOp);
- } else {
- childAliasOp.setParentOperators(Utilities.makeList(parentFOp));
- }
- parentFOp.replaceChild(fop, childAliasOp);
+ return localTableTotalSize;
+ }
- work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
- for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
- .entrySet()) {
- if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
- work.getPathToPartitionInfo().put(childWorkEntry.getKey(), childWorkEntry.getValue());
+ /**
+ * Check if the total size of local tables will be under
+ * the limit after we merge localWork1 and localWork2.
+ * The limit of the total size of local tables is defined by
+ * HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD.
+ * @param conf
+ * @param localWorks
+ * @return
+ */
+ private boolean isLocalTableTotalSizeUnderLimitAfterMerge(
+ Configuration conf,
+ MapredLocalWork... localWorks) {
+ final long localTableTotalSizeLimit = HiveConf.getLongVar(conf,
+ HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+ long localTableTotalSize = 0;
+ for (int i = 0; i < localWorks.length; i++) {
+ final long localWorkTableTotalSize = calculateLocalTableTotalSize(localWorks[i]);
+ if (localWorkTableTotalSize < 0) {
+ // The total size of local tables in localWork[i] is unknown.
+ return false;
}
+ localTableTotalSize += localWorkTableTotalSize;
}
- localWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
- localWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
-
- // remove the child task
- List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
- task.setChildTasks(oldChildTasks);
- if (oldChildTasks != null) {
- for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
- oldChildTask.getParentTasks().remove(childTask);
- oldChildTask.getParentTasks().add(task);
- }
+ if (localTableTotalSize > localTableTotalSizeLimit) {
+ // The total size of local tables after we merge localWorks
+ // is larger than the limit set by
+ // HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD.
+ return false;
}
- boolean convertToSingleJob = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
- if (convertToSingleJob) {
- copyReducerConf(task, childTask);
- }
+ return true;
}
- /**
- * Copy reducer configuration if the childTask also has a reducer.
- *
- * @param task
- * @param childTask
- */
- private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
- MapredWork mrChildWork = childTask.getWork();
- ReduceWork childWork = childTask.getWork().getReduceWork();
- if (childWork == null) {
- return;
- }
+ // Get the position of the big table for this join operator and the given alias
+ private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
+ String alias) {
+ Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
- Operator childReducer = childWork.getReducer();
- MapredWork work = task.getWork();
- if (childReducer == null) {
- return;
+ // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
+ while ((parentOp.getChildOperators() != null) &&
+ (!parentOp.getChildOperators().isEmpty())) {
+ parentOp = parentOp.getChildOperators().get(0);
}
- ReduceWork rWork = new ReduceWork();
- work.setReduceWork(rWork);
- rWork.setReducer(childReducer);
- rWork.setNumReduceTasks(childWork.getNumReduceTasks());
- work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
- rWork.setNeedsTagging(childWork.getNeedsTagging());
-
- // Make sure the key configuration is correct, clear and regenerate.
- rWork.getTagToValueDesc().clear();
- GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
+ return joinOp.getParentOperators().indexOf(parentOp);
}
// create map join task and set big table as bigTablePosition
@@ -305,129 +199,165 @@ public class CommonJoinTaskDispatcher ex
* A task and its child task has been converted from join to mapjoin.
* See if the two tasks can be merged.
*/
- private void mergeMapJoinTaskWithMapReduceTask(MapRedTask mapJoinTask, Configuration conf) {
+ private void mergeMapJoinTaskIntoItsChildMapRedTask(MapRedTask mapJoinTask, Configuration conf)
+ throws SemanticException{
+ // Step 1: Check if mapJoinTask has a single child.
+ // If so, check if we can merge mapJoinTask into that child.
if (mapJoinTask.getChildTasks() == null
|| mapJoinTask.getChildTasks().size() > 1) {
// No child-task to merge, nothing to do or there are more than one
// child-tasks in which case we don't want to do anything.
return;
}
- Task<? extends Serializable> firstChildTask = mapJoinTask.getChildTasks().get(0);
- if (!(firstChildTask instanceof MapRedTask)) {
- // Nothing to do if it is not a mapreduce task.
- return;
- }
- MapRedTask childTask = (MapRedTask) firstChildTask;
- MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
- MapredWork childWork = childTask.getWork();
- if (childWork.getReduceWork() == null) {
- // Not a MR job, nothing to merge.
- return;
- }
- // Can this be merged
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = mapJoinWork.getAliasToWork();
- if (aliasToWork.size() > 1) {
- return;
- }
- Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
- if (childPathToAliases.size() > 1) {
+ Task<? extends Serializable> childTask = mapJoinTask.getChildTasks().get(0);
+ if (!(childTask instanceof MapRedTask)) {
+ // Nothing to do if it is not a MapReduce task.
return;
}
- // Locate leaf operator of the map-join task. Start by initializing leaf
- // operator to be root operator.
- Operator<? extends OperatorDesc> mapJoinLeafOperator = aliasToWork.values().iterator().next();
- while (mapJoinLeafOperator.getChildOperators() != null) {
- // Dont perform this optimization for multi-table inserts
- if (mapJoinLeafOperator.getChildOperators().size() > 1) {
- return;
- }
- mapJoinLeafOperator = mapJoinLeafOperator.getChildOperators().get(0);
- }
+ MapRedTask childMapRedTask = (MapRedTask) childTask;
+ MapWork mapJoinMapWork = mapJoinTask.getWork().getMapWork();
+ MapWork childMapWork = childMapRedTask.getWork().getMapWork();
- assert (mapJoinLeafOperator instanceof FileSinkOperator);
- if (!(mapJoinLeafOperator instanceof FileSinkOperator)) {
- // Sanity check, shouldn't happen.
+ Map<String, Operator<? extends OperatorDesc>> mapJoinAliasToWork =
+ mapJoinMapWork.getAliasToWork();
+ if (mapJoinAliasToWork.size() > 1) {
+ // Do not merge if the MapredWork of MapJoin has multiple input aliases.
return;
}
- FileSinkOperator mapJoinTaskFileSinkOperator = (FileSinkOperator) mapJoinLeafOperator;
+ Entry<String, Operator<? extends OperatorDesc>> mapJoinAliasToWorkEntry =
+ mapJoinAliasToWork.entrySet().iterator().next();
+ String mapJoinAlias = mapJoinAliasToWorkEntry.getKey();
+ TableScanOperator mapJoinTaskTableScanOperator =
+ OperatorUtils.findSingleOperator(
+ mapJoinAliasToWorkEntry.getValue(), TableScanOperator.class);
+ if (mapJoinTaskTableScanOperator == null) {
+ throw new SemanticException("Expected a " + TableScanOperator.getOperatorName() +
+ " operator as the work associated with alias " + mapJoinAlias +
+ ". Found a " + mapJoinAliasToWork.get(mapJoinAlias).getName() + " operator.");
+ }
+ FileSinkOperator mapJoinTaskFileSinkOperator =
+ OperatorUtils.findSingleOperator(
+ mapJoinTaskTableScanOperator, FileSinkOperator.class);
+ if (mapJoinTaskFileSinkOperator == null) {
+ throw new SemanticException("Cannot find the " + FileSinkOperator.getOperatorName() +
+ " operator at the last operator of the MapJoin Task.");
+ }
- // The filesink writes to a different directory
- String workDir = mapJoinTaskFileSinkOperator.getConf().getDirName();
- if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
+ // The mapJoinTaskFileSinkOperator writes to a different directory
+ String childMRPath = mapJoinTaskFileSinkOperator.getConf().getDirName();
+ List<String> childMRAliases = childMapWork.getPathToAliases().get(childMRPath);
+ if (childMRAliases == null || childMRAliases.size() != 1) {
return;
}
+ String childMRAlias = childMRAliases.get(0);
- MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
- MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
+ MapredLocalWork mapJoinLocalWork = mapJoinMapWork.getMapLocalWork();
+ MapredLocalWork childLocalWork = childMapWork.getMapLocalWork();
- // Either of them should not be bucketed
if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
(childLocalWork != null && childLocalWork.getBucketMapjoinContext() != null)) {
+ // Right now, we do not handle the case that either of them is bucketed.
+ // We should relax this constraint with a follow-up jira.
return;
}
- if (childWork.getMapWork().getAliasToWork().size() > 1) {
- return;
- }
-
- Operator<? extends Serializable> childAliasOp =
- childWork.getMapWork().getAliasToWork().values().iterator().next();
- if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
- return;
- }
-
- // remove the unnecessary TableScan
- if (childAliasOp instanceof TableScanOperator) {
- TableScanOperator tso = (TableScanOperator)childAliasOp;
- if (tso.getNumChild() != 1) {
- // shouldn't happen
- return;
- }
- childAliasOp = tso.getChildOperators().get(0);
- childAliasOp.getParentOperators().remove(tso);
- }
-
- // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
- // top of the second
- Operator<? extends Serializable> parentFOp = mapJoinTaskFileSinkOperator
- .getParentOperators().get(0);
- parentFOp.getChildOperators().remove(mapJoinTaskFileSinkOperator);
- parentFOp.getChildOperators().add(childAliasOp);
- List<Operator<? extends OperatorDesc>> parentOps =
- new ArrayList<Operator<? extends OperatorDesc>>();
- parentOps.add(parentFOp);
- childAliasOp.setParentOperators(parentOps);
-
- mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
- for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
- .entrySet()) {
- if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
- mapJoinWork.getPathToPartitionInfo()
- .put(childWorkEntry.getKey(), childWorkEntry.getValue());
+ // We need to check if the total size of local tables is under the limit.
+ // At here, we are using a strong condition, which is the total size of
+ // local tables used by all input paths. Actually, we can relax this condition
+ // to check the total size of local tables for every input path.
+ // Example:
+ // UNION_ALL
+ // / \
+ // / \
+ // / \
+ // / \
+ // MapJoin1 MapJoin2
+ // / | \ / | \
+ // / | \ / | \
+ // Big1 S1 S2 Big2 S3 S4
+ // In this case, we have two MapJoins, MapJoin1 and MapJoin2. Big1 and Big2 are two
+ // big tables, and S1, S2, S3, and S4 are four small tables. Hash tables of S1 and S2
+ // will only be used by Map tasks processing Big1. Hash tables of S3 and S4 will only
+ // be used by Map tasks processing Big2. If Big1!=Big2, we should only check if the size
+ // of S1 + S2 is under the limit, and if the size of S3 + S4 is under the limit.
+ // But, right now, we are checking the size of S1 + S2 + S3 + S4 is under the limit.
+ // If Big1=Big2, we will only scan a path once. So, MapJoin1 and MapJoin2 will be executed
+ // in the same Map task. In this case, we need to make sure the size of S1 + S2 + S3 + S4
+ // is under the limit.
+ if (!isLocalTableTotalSizeUnderLimitAfterMerge(conf, mapJoinLocalWork, childLocalWork)){
+ // The total size of local tables may not be under
+ // the limit after we merge mapJoinLocalWork and childLocalWork.
+ // Do not merge.
+ return;
+ }
+
+ TableScanOperator childMRTaskTableScanOperator =
+ OperatorUtils.findSingleOperator(
+ childMapWork.getAliasToWork().get(childMRAlias), TableScanOperator.class);
+ if (childMRTaskTableScanOperator == null) {
+ throw new SemanticException("Expected a " + TableScanOperator.getOperatorName() +
+ " operator as the work associated with alias " + childMRAlias +
+ ". Found a " + childMapWork.getAliasToWork().get(childMRAlias).getName() + " operator.");
+ }
+
+ List<Operator<? extends OperatorDesc>> parentsInMapJoinTask =
+ mapJoinTaskFileSinkOperator.getParentOperators();
+ List<Operator<? extends OperatorDesc>> childrenInChildMRTask =
+ childMRTaskTableScanOperator.getChildOperators();
+ if (parentsInMapJoinTask.size() > 1 || childrenInChildMRTask.size() > 1) {
+ // Do not merge if we do not know how to connect two operator trees.
+ return;
+ }
+
+ // Step 2: Merge mapJoinTask into the Map-side of its child.
+ // Step 2.1: Connect the operator trees of two MapRedTasks.
+ Operator<? extends OperatorDesc> parentInMapJoinTask = parentsInMapJoinTask.get(0);
+ Operator<? extends OperatorDesc> childInChildMRTask = childrenInChildMRTask.get(0);
+ parentInMapJoinTask.replaceChild(mapJoinTaskFileSinkOperator, childInChildMRTask);
+ childInChildMRTask.replaceParent(childMRTaskTableScanOperator, parentInMapJoinTask);
+
+ // Step 2.2: Replace the corresponding part childMRWork's MapWork.
+ GenMapRedUtils.replaceMapWork(mapJoinAlias, childMRAlias, mapJoinMapWork, childMapWork);
+
+ // Step 2.3: Fill up stuff in local work
+ if (mapJoinLocalWork != null) {
+ if (childLocalWork == null) {
+ childMapWork.setMapLocalWork(mapJoinLocalWork);
+ } else {
+ childLocalWork.getAliasToFetchWork().putAll(mapJoinLocalWork.getAliasToFetchWork());
+ childLocalWork.getAliasToWork().putAll(mapJoinLocalWork.getAliasToWork());
+ }
+ }
+
+ // Step 2.4: Remove this MapJoin task
+ List<Task<? extends Serializable>> parentTasks = mapJoinTask.getParentTasks();
+ mapJoinTask.setParentTasks(null);
+ mapJoinTask.setChildTasks(null);
+ childMapRedTask.getParentTasks().remove(mapJoinTask);
+ if (parentTasks != null) {
+ childMapRedTask.getParentTasks().addAll(parentTasks);
+ for (Task<? extends Serializable> parentTask : parentTasks) {
+ parentTask.getChildTasks().remove(mapJoinTask);
+ if (!parentTask.getChildTasks().contains(childMapRedTask)) {
+ parentTask.getChildTasks().add(childMapRedTask);
+ }
}
- }
-
- // Fill up stuff in local work
- if (mapJoinLocalWork != null && childLocalWork != null) {
- mapJoinLocalWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
- mapJoinLocalWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
- }
-
- // remove the child task
- List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
- mapJoinTask.setChildTasks(oldChildTasks);
- if (oldChildTasks != null) {
- for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
- oldChildTask.getParentTasks().remove(childTask);
- oldChildTask.getParentTasks().add(mapJoinTask);
+ } else {
+ if (physicalContext.getRootTasks().contains(mapJoinTask)) {
+ physicalContext.removeFromRootTask(mapJoinTask);
+ if (childMapRedTask.getParentTasks() != null &&
+ childMapRedTask.getParentTasks().size() == 0 &&
+ !physicalContext.getRootTasks().contains(childMapRedTask)) {
+ physicalContext.addToRootTask(childMapRedTask);
+ }
}
}
-
- // Copy the reducer conf.
- copyReducerConf(mapJoinTask, childTask);
+ if (childMapRedTask.getParentTasks().size() == 0) {
+ childMapRedTask.setParentTasks(null);
+ }
}
public static boolean cannotConvert(String bigTableAlias,
@@ -557,20 +487,7 @@ public class CommonJoinTaskDispatcher ex
// Can this task be merged with the child task. This can happen if a big table is being
// joined with multiple small tables on different keys
if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)) {
- if (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP) {
- // Merging two map-join tasks
- mergeMapJoinTaskWithChildMapJoinTask(newTask, conf);
- }
-
- // Converted the join operator into a map-join. Now see if it can
- // be merged into the following map-reduce job.
- boolean convertToSingleJob = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVEOPTIMIZEMAPJOINFOLLOWEDBYMR);
- if (convertToSingleJob) {
- // Try merging a map-join task with a mapreduce job to have a
- // single job.
- mergeMapJoinTaskWithMapReduceTask(newTask, conf);
- }
+ mergeMapJoinTaskIntoItsChildMapRedTask(newTask, conf);
}
return newTask;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartExprEvalUtils.java Fri Aug 16 01:21:54 2013
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
@@ -44,9 +45,11 @@ public class PartExprEvalUtils {
* @return value returned by the expression
* @throws HiveException
*/
- static synchronized public Object evalExprWithPart(ExprNodeDesc expr, LinkedHashMap<String, String> partSpec,
+ static synchronized public Object evalExprWithPart(ExprNodeDesc expr,
+ LinkedHashMap<String, String> partSpec, List<VirtualColumn> vcs,
StructObjectInspector rowObjectInspector) throws HiveException {
- Object[] rowWithPart = new Object[2];
+ boolean hasVC = vcs != null && !vcs.isEmpty();
+ Object[] rowWithPart = new Object[hasVC ? 3 : 2];
// Create the row object
ArrayList<String> partNames = new ArrayList<String>();
ArrayList<String> partValues = new ArrayList<String>();
@@ -61,10 +64,12 @@ public class PartExprEvalUtils {
.getStandardStructObjectInspector(partNames, partObjectInspectors);
rowWithPart[1] = partValues;
- ArrayList<StructObjectInspector> ois = new ArrayList<StructObjectInspector>(
- 2);
+ ArrayList<StructObjectInspector> ois = new ArrayList<StructObjectInspector>(2);
ois.add(rowObjectInspector);
ois.add(partObjectInspector);
+ if (hasVC) {
+ ois.add(VirtualColumn.getVCSObjectInspector(vcs));
+ }
StructObjectInspector rowWithPartObjectInspector = ObjectInspectorFactory
.getUnionStructObjectInspector(ois);
@@ -79,25 +84,25 @@ public class PartExprEvalUtils {
}
static synchronized public Map<PrimitiveObjectInspector, ExprNodeEvaluator> prepareExpr(
- ExprNodeDesc expr, List<String> partNames,
- StructObjectInspector rowObjectInspector) throws HiveException {
-
+ ExprNodeDesc expr, List<String> partNames, List<VirtualColumn> vcs) throws HiveException {
+ boolean hasVC = vcs != null && !vcs.isEmpty();
// Create the row object
List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
for (int i = 0; i < partNames.size(); i++) {
partObjectInspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
}
- StructObjectInspector partObjectInspector = ObjectInspectorFactory
+ StructObjectInspector objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(partNames, partObjectInspectors);
- List<StructObjectInspector> ois = new ArrayList<StructObjectInspector>(2);
- ois.add(rowObjectInspector);
- ois.add(partObjectInspector);
- StructObjectInspector rowWithPartObjectInspector =
- ObjectInspectorFactory.getUnionStructObjectInspector(ois);
+ if (hasVC) {
+ List<StructObjectInspector> ois = new ArrayList<StructObjectInspector>(2);
+ ois.add(objectInspector);
+ ois.add(VirtualColumn.getVCSObjectInspector(vcs));
+ objectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(ois);
+ }
ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr);
- ObjectInspector evaluateResultOI = evaluator.initialize(rowWithPartObjectInspector);
+ ObjectInspector evaluateResultOI = evaluator.initialize(objectInspector);
Map<PrimitiveObjectInspector, ExprNodeEvaluator> result =
new HashMap<PrimitiveObjectInspector, ExprNodeEvaluator>();
@@ -106,7 +111,7 @@ public class PartExprEvalUtils {
}
static synchronized public Object evaluateExprOnPart(
- Map<PrimitiveObjectInspector, ExprNodeEvaluator> pair, Object[] rowWithPart)
+ Map<PrimitiveObjectInspector, ExprNodeEvaluator> pair, Object rowWithPart)
throws HiveException {
assert(pair.size() > 0);
// only get the 1st entry from the map
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Fri Aug 16 01:21:54 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer.ppr;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -36,12 +37,14 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.parse.ParseContext;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.thrift.TException;
@@ -130,6 +134,17 @@ public class PartitionPruner implements
}
/**
+ * Get the partition list for the TS operator that satisfies the partition pruner
+ * condition.
+ */
+ public static PrunedPartitionList prune(TableScanOperator ts, ParseContext parseCtx,
+ String alias) throws HiveException {
+ return prune(parseCtx.getTopToTable().get(ts),
+ parseCtx.getOpToPartPruner().get(ts), parseCtx.getConf(), alias,
+ ts.getConf().getVirtualCols(), parseCtx.getPrunedPartitions());
+ }
+
+ /**
* Get the partition list for the table that satisfies the partition pruner
* condition.
*
@@ -141,12 +156,16 @@ public class PartitionPruner implements
* for checking whether "strict" mode is on.
* @param alias
* for generating error message only.
+ * @param vcs
+ * virtual columns referenced
+ * @param prunedPartitionsMap
+ * cached result for the table
* @return the partition list for the table that satisfies the partition
* pruner condition.
* @throws HiveException
*/
- public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
- HiveConf conf, String alias,
+ private static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr,
+ HiveConf conf, String alias, List<VirtualColumn> vcs,
Map<String, PrunedPartitionList> prunedPartitionsMap) throws HiveException {
LOG.trace("Started pruning partiton");
LOG.trace("dbname = " + tab.getDbName());
@@ -163,76 +182,7 @@ public class PartitionPruner implements
return ret;
}
- LinkedHashSet<Partition> true_parts = new LinkedHashSet<Partition>();
- LinkedHashSet<Partition> unkn_parts = new LinkedHashSet<Partition>();
- LinkedHashSet<Partition> denied_parts = new LinkedHashSet<Partition>();
-
- try {
- StructObjectInspector rowObjectInspector = (StructObjectInspector) tab
- .getDeserializer().getObjectInspector();
- Object[] rowWithPart = new Object[2];
-
- if (tab.isPartitioned()) {
- // If the "strict" mode is on, we have to provide partition pruner for
- // each table.
- if ("strict".equalsIgnoreCase(HiveConf.getVar(conf,
- HiveConf.ConfVars.HIVEMAPREDMODE))) {
- if (!hasColumnExpr(prunerExpr)) {
- throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
- .getMsg("for Alias \"" + alias + "\" Table \""
- + tab.getTableName() + "\""));
- }
- }
-
- if (prunerExpr == null) {
- // This can happen when hive.mapred.mode=nonstrict and there is no predicates at all
- // Add all partitions to the unknown_parts so that a MR job is generated.
- true_parts.addAll(Hive.get().getPartitions(tab));
- } else {
- // remove non-partition columns
- ExprNodeDesc compactExpr = prunerExpr.clone();
- compactExpr = compactExpr(compactExpr);
- LOG.debug("Filter w/ compacting: " +
- ((compactExpr != null) ? compactExpr.getExprString(): "null") +
- "; filter w/o compacting: " +
- ((prunerExpr != null) ? prunerExpr.getExprString(): "null"));
- if (compactExpr == null) {
- // This could happen when hive.mapred.mode=nonstrict and all the predicates
- // are on non-partition columns.
- unkn_parts.addAll(Hive.get().getPartitions(tab));
- } else {
- String message = Utilities.checkJDOPushDown(tab, compactExpr, null);
- if (message == null) {
- String filter = compactExpr.getExprString();
- String oldFilter = prunerExpr.getExprString();
-
- if (filter.equals(oldFilter)) {
- // pruneExpr contains only partition columns
- pruneByPushDown(tab, true_parts, filter);
- } else {
- // pruneExpr contains non-partition columns
- pruneByPushDown(tab, unkn_parts, filter);
- }
- } else {
- LOG.info(ErrorMsg.INVALID_JDO_FILTER_EXPRESSION.getMsg("by condition '"
- + message + "'"));
- pruneBySequentialScan(tab, true_parts, unkn_parts, denied_parts,
- prunerExpr, rowObjectInspector, conf);
- }
- }
- }
- LOG.debug("tabname = " + tab.getTableName() + " is partitioned");
- } else {
- true_parts.addAll(Hive.get().getPartitions(tab));
- }
- } catch (HiveException e) {
- throw e;
- } catch (Exception e) {
- throw new HiveException(e);
- }
-
- // Now return the set of partitions
- ret = new PrunedPartitionList(tab, true_parts, unkn_parts, denied_parts);
+ ret = getPartitionsFromServer(tab, prunerExpr, vcs, conf, alias);
prunedPartitionsMap.put(key, ret);
return ret;
}
@@ -253,7 +203,7 @@ public class PartitionPruner implements
GenericUDF udf = ((ExprNodeGenericFuncDesc)expr).getGenericUDF();
if (udf instanceof GenericUDFOPAnd ||
udf instanceof GenericUDFOPOr) {
- List<ExprNodeDesc> children = ((ExprNodeGenericFuncDesc)expr).getChildren();
+ List<ExprNodeDesc> children = expr.getChildren();
ExprNodeDesc left = children.get(0);
children.set(0, compactExpr(left));
ExprNodeDesc right = children.get(1);
@@ -271,112 +221,149 @@ public class PartitionPruner implements
return expr;
}
- /**
- * Pruning partition using JDO filtering.
- * @param tab the table containing the partitions.
- * @param true_parts the resulting partitions.
- * @param filter the SQL predicate that involves only partition columns
- * @throws HiveException
- * @throws MetaException
- * @throws NoSuchObjectException
- * @throws TException
- */
- static private void pruneByPushDown(Table tab, Set<Partition> true_parts, String filter)
- throws HiveException, MetaException, NoSuchObjectException, TException {
- Hive db = Hive.get();
- List<Partition> parts = db.getPartitionsByFilter(tab, filter);
- true_parts.addAll(parts);
- return;
+ private static PrunedPartitionList getPartitionsFromServer(Table tab, ExprNodeDesc prunerExpr,
+ List<VirtualColumn> vcs, HiveConf conf, String alias) throws HiveException {
+ try {
+ if (!tab.isPartitioned()) {
+ // If the table is not partitioned, return everything.
+ return new PrunedPartitionList(
+ tab, new LinkedHashSet<Partition>(Hive.get().getPartitions(tab)), false);
+ }
+ LOG.debug("tabname = " + tab.getTableName() + " is partitioned");
+
+ if ("strict".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEMAPREDMODE))
+ && !hasColumnExpr(prunerExpr)) {
+ // If the "strict" mode is on, we have to provide partition pruner for each table.
+ throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE
+ .getMsg("for Alias \"" + alias + "\" Table \"" + tab.getTableName() + "\""));
+ }
+
+ if (prunerExpr == null) {
+ // This can happen when hive.mapred.mode=nonstrict and there is no predicates at all.
+ return new PrunedPartitionList(tab,
+ new LinkedHashSet<Partition>(Hive.get().getPartitions(tab)), false);
+ }
+
+ // Remove non-partition columns.
+ ExprNodeDesc compactExpr = compactExpr(prunerExpr.clone());
+ String oldFilter = prunerExpr.getExprString();
+ if (compactExpr == null) {
+ // This could happen when hive.mapred.mode=nonstrict and all the predicates
+ // are on non-partition columns.
+ LOG.debug("Filter " + oldFilter + " was null after compacting");
+ return new PrunedPartitionList(
+ tab, new LinkedHashSet<Partition>(Hive.get().getPartitions(tab)), true);
+ }
+
+ Set<Partition> partitions = new LinkedHashSet<Partition>();
+ boolean hasUnknownPartitions = false;
+ String message = Utilities.checkJDOPushDown(tab, compactExpr, null);
+ if (message != null) {
+ LOG.info(ErrorMsg.INVALID_JDO_FILTER_EXPRESSION.getMsg("by condition '"
+ + message + "'"));
+ hasUnknownPartitions = pruneBySequentialScan(tab, partitions, prunerExpr, vcs, conf);
+ } else {
+ String filter = compactExpr.getExprString();
+ LOG.debug("Filter w/ compacting: " + filter +"; filter w/o compacting: " + oldFilter);
+ hasUnknownPartitions = !filter.equals(oldFilter);
+ partitions.addAll(Hive.get().getPartitionsByFilter(tab, filter));
+ }
+ return new PrunedPartitionList(tab, partitions, hasUnknownPartitions);
+ } catch (HiveException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
}
/**
* Pruning partition by getting the partition names first and pruning using Hive expression
* evaluator.
* @param tab the table containing the partitions.
- * @param true_parts the resulting partitions if the partition pruning expression only contains
- * partition columns.
- * @param unkn_parts the resulting partitions if the partition pruning expression that only contains
- * non-partition columns.
- * @param denied_parts pruned out partitions.
+ * @param partitions the resulting partitions.
* @param prunerExpr the SQL predicate that involves partition columns.
- * @param rowObjectInspector object inspector used by the evaluator
+ * @param vcs virtual columns referenced
* @param conf Hive Configuration object, can not be NULL.
- * @throws Exception
+ * @return true iff the partition pruning expression contains non-partition columns.
*/
- static private void pruneBySequentialScan(Table tab, Set<Partition> true_parts, Set<Partition> unkn_parts,
- Set<Partition> denied_parts, ExprNodeDesc prunerExpr, StructObjectInspector rowObjectInspector, HiveConf conf)
- throws Exception {
-
- List<String> trueNames = null;
- List<String> unknNames = null;
-
+ static private boolean pruneBySequentialScan(Table tab, Set<Partition> partitions,
+ ExprNodeDesc prunerExpr, List<VirtualColumn> vcs, HiveConf conf) throws Exception {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
-
perfLogger.PerfLogBegin(LOG, PerfLogger.PRUNE_LISTING);
- List<String> partNames = Hive.get().getPartitionNames(tab.getDbName(),
- tab.getTableName(), (short) -1);
+ List<String> partNames = Hive.get().getPartitionNames(
+ tab.getDbName(), tab.getTableName(), (short) -1);
+ String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
List<FieldSchema> pCols = tab.getPartCols();
List<String> partCols = new ArrayList<String>(pCols.size());
- List<String> values = new ArrayList<String>(pCols.size());
- Object[] objectWithPart = new Object[2];
- String defaultPartitionName = conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME);
-
for (FieldSchema pCol : pCols) {
partCols.add(pCol.getName());
}
- Map<PrimitiveObjectInspector, ExprNodeEvaluator> handle = PartExprEvalUtils.prepareExpr(
- prunerExpr, partCols, rowObjectInspector);
-
- for (String partName : partNames) {
+ boolean hasUnknownPartitions = prunePartitionNames(
+ partCols, prunerExpr, vcs, defaultPartitionName, partNames);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.PRUNE_LISTING);
- // Set all the variables here
- LinkedHashMap<String, String> partSpec = Warehouse
- .makeSpecFromName(partName);
+ perfLogger.PerfLogBegin(LOG, PerfLogger.PARTITION_RETRIEVING);
+ if (!partNames.isEmpty()) {
+ partitions.addAll(Hive.get().getPartitionsByNames(tab, partNames));
+ }
+ perfLogger.PerfLogEnd(LOG, PerfLogger.PARTITION_RETRIEVING);
+ return hasUnknownPartitions;
+ }
+ /**
+ * Prunes partition names to see if they match the prune expression.
+ * @param tab Table.
+ * @param prunerExpr The expression to match.
+ * @param conf Hive configuration.
+ * @param partNames Partition names to filter. The list is modified in place.
+ * @return Whether the list has any partitions for which the expression may or may not match.
+ */
+ public static boolean prunePartitionNames(List<String> columnNames, ExprNodeDesc prunerExpr,
+ List<VirtualColumn> vcs, String defaultPartitionName, List<String> partNames)
+ throws HiveException, MetaException {
+ // Prepare the expression to filter on the columns.
+ Map<PrimitiveObjectInspector, ExprNodeEvaluator> handle =
+ PartExprEvalUtils.prepareExpr(prunerExpr, columnNames, vcs);
+
+ // Filter the name list.
+ List<String> values = new ArrayList<String>(columnNames.size());
+ Object evalArg = values;
+ boolean hasVC = vcs != null && !vcs.isEmpty();
+ if (hasVC) {
+ Object[] objectWithPart = new Object[2];
+ objectWithPart[0] = values;
+ evalArg = objectWithPart;
+ }
+
+ boolean hasUnknownPartitions = false;
+ Iterator<String> partIter = partNames.iterator();
+ while (partIter.hasNext()) {
+ String partName = partIter.next();
+ LinkedHashMap<String, String> partSpec = Warehouse.makeSpecFromName(partName);
values.clear();
- for (Map.Entry<String, String> kv: partSpec.entrySet()) {
- values.add(kv.getValue());
- }
- objectWithPart[1] = values;
+ values.addAll(partSpec.values());
- // evaluate the expression tree
- Boolean r = (Boolean) PartExprEvalUtils.evaluateExprOnPart(handle, objectWithPart);
-
- if (r == null) {
+ // Evaluate the expression tree.
+ Boolean isNeeded = (Boolean)PartExprEvalUtils.evaluateExprOnPart(handle, evalArg);
+ boolean isUnknown = (isNeeded == null);
+ if (!isUnknown && !isNeeded) {
+ partIter.remove();
+ continue;
+ }
+ if (isUnknown && values.contains(defaultPartitionName)) {
// Reject default partitions if we couldn't determine whether we should include it or not.
// Note that predicate would only contains partition column parts of original predicate.
- if (values.contains(defaultPartitionName)) {
- LOG.debug("skipping default/bad partition: " + partName);
- }else {
- if (unknNames == null) {
- unknNames = new LinkedList<String>();
- }
- unknNames.add(partName);
- LOG.debug("retained unknown partition: " + partName);
- }
- } else if (Boolean.TRUE.equals(r)) {
- if (trueNames == null) {
- trueNames = new LinkedList<String>();
- }
- trueNames.add(partName);
- LOG.debug("retained partition: " + partName);
+ LOG.debug("skipping default/bad partition: " + partName);
+ partIter.remove();
+ continue;
}
+ hasUnknownPartitions |= isUnknown;
+ LOG.debug("retained " + (isUnknown ? "unknown " : "") + "partition: " + partName);
}
- perfLogger.PerfLogEnd(LOG, PerfLogger.PRUNE_LISTING);
-
- perfLogger.PerfLogBegin(LOG, PerfLogger.PARTITION_RETRIEVING);
- if (trueNames != null) {
- List<Partition> parts = Hive.get().getPartitionsByNames(tab, trueNames);
- true_parts.addAll(parts);
- }
- if (unknNames != null) {
- List<Partition> parts = Hive.get().getPartitionsByNames(tab, unknNames);
- unkn_parts.addAll(parts);
- }
- perfLogger.PerfLogEnd(LOG, PerfLogger.PARTITION_RETRIEVING);
+ return hasUnknownPartitions;
}
/**
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Fri Aug 16 01:21:54 2013
@@ -986,13 +986,26 @@ public abstract class BaseSemanticAnalyz
ListBucketingCtx lbCtx = new ListBucketingCtx();
lbCtx.setSkewedColNames(skewedColNames);
lbCtx.setSkewedColValues(skewedValues);
- lbCtx.setLbLocationMap(skewedColValueLocationMaps);
+ lbCtx.setLbLocationMap(convertSkewedValueListToSimpleList(skewedColValueLocationMaps));
lbCtx.setStoredAsSubDirectories(isStoredAsSubDirectories);
lbCtx.setDefaultKey(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_KEY);
lbCtx.setDefaultDirName(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
return lbCtx;
}
+ // This is done to avoid the need of sending metastore jars to task nodes.
+ private Map<List<String>, String> convertSkewedValueListToSimpleList(
+ Map<SkewedValueList, String> skewedColValueLocationMaps) {
+ if (skewedColValueLocationMaps == null) {
+ return null;
+ }
+ Map<List<String>, String> converted = new HashMap<List<String>, String>();
+ for (Map.Entry<SkewedValueList, String> entry : skewedColValueLocationMaps.entrySet()) {
+ converted.put(entry.getKey().getSkewedValueList(), entry.getValue());
+ }
+ return converted;
+ }
+
/**
* Given a ASTNode, return list of values.
*
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Fri Aug 16 01:21:54 2013
@@ -425,8 +425,7 @@ public class PTFTranslator {
}
}
- if (RANKING_FUNCS.contains(spec.getName()))
- {
+ if (FunctionRegistry.isRankingFunction(spec.getName())){
setupRankingArgs(wdwTFnDef, def, spec);
}
@@ -785,19 +784,6 @@ public class PTFTranslator {
return combinedOrdExprs;
}
-
- /*
- * Ranking Functions helpers
- */
-
- protected static final ArrayList<String> RANKING_FUNCS = new ArrayList<String>();
- static {
- RANKING_FUNCS.add("rank");
- RANKING_FUNCS.add("dense_rank");
- RANKING_FUNCS.add("percent_rank");
- RANKING_FUNCS.add("cume_dist");
- };
-
private void setupRankingArgs(WindowTableFunctionDef wdwTFnDef,
WindowFunctionDef wFnDef,
WindowFunctionSpec wSpec)
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Fri Aug 16 01:21:54 2013
@@ -392,6 +392,27 @@ public class ParseContext {
}
/**
+ * Remove the OpParseContext of a specific operator op
+ * @param op
+ * @return
+ */
+ public OpParseContext removeOpParseCtx(Operator<? extends OperatorDesc> op) {
+ return opParseCtx.remove(op);
+ }
+
+ /**
+ * Update the OpParseContext of operator op to newOpParseContext.
+ * If op is not in opParseCtx, a new entry will be added into opParseCtx.
+ * The key is op, and the value is newOpParseContext.
+ * @param op
+ * @param newOpParseContext
+ */
+ public void updateOpParseCtx(Operator<? extends OperatorDesc> op,
+ OpParseContext newOpParseContext) {
+ opParseCtx.put(op, newOpParseContext);
+ }
+
+ /**
* @param opParseCtx
* the opParseCtx to set
*/
@@ -619,8 +640,7 @@ public class ParseContext {
throws HiveException {
PrunedPartitionList partsList = opToPartList.get(ts);
if (partsList == null) {
- partsList = PartitionPruner.prune(topToTable.get(ts),
- opToPartPruner.get(ts), conf, alias, prunedPartitions);
+ partsList = PartitionPruner.prune(ts, this, alias);
opToPartList.put(ts, partsList);
}
return partsList;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java Fri Aug 16 01:21:54 2013
@@ -30,30 +30,19 @@ import org.apache.hadoop.hive.ql.metadat
*/
public class PrunedPartitionList {
- // source table
+ /** Source table. */
private final Table source;
- // confirmed partitions - satisfy the partition criteria
- private Set<Partition> confirmedPartns;
+ /** Partitions that either satisfy the partition criteria, or may satisfy it. */
+ private Set<Partition> partitions;
- // unknown partitions - may/may not satisfy the partition criteria
- private Set<Partition> unknownPartns;
+ /** Whether there are partitions in the list that may or may not satisfy the criteria. */
+ private boolean hasUnknowns;
- // denied partitions - do not satisfy the partition criteria
- private final Set<Partition> deniedPartns;
-
- /**
- * @param confirmedPartns
- * confirmed partitions
- * @param unknownPartns
- * unknown partitions
- */
- public PrunedPartitionList(Table source, Set<Partition> confirmedPartns,
- Set<Partition> unknownPartns, Set<Partition> deniedPartns) {
+ public PrunedPartitionList(Table source, Set<Partition> partitions, boolean hasUnknowns) {
this.source = source;
- this.confirmedPartns = confirmedPartns;
- this.unknownPartns = unknownPartns;
- this.deniedPartns = deniedPartns;
+ this.partitions = partitions;
+ this.hasUnknowns = hasUnknowns;
}
public Table getSourceTable() {
@@ -61,59 +50,24 @@ public class PrunedPartitionList {
}
/**
- * get confirmed partitions.
- *
- * @return confirmedPartns confirmed paritions
- */
- public Set<Partition> getConfirmedPartns() {
- return confirmedPartns;
- }
-
- /**
- * get unknown partitions.
- *
- * @return unknownPartns unknown paritions
+ * @return partitions
*/
- public Set<Partition> getUnknownPartns() {
- return unknownPartns;
+ public Set<Partition> getPartitions() {
+ return partitions;
}
- /**
- * get denied partitions.
- *
- * @return deniedPartns denied paritions
- */
- public Set<Partition> getDeniedPartns() {
- return deniedPartns;
- }
/**
- * return all not-denied(confirmed + unknown) partitions.
+ * @return all partitions.
*/
public List<Partition> getNotDeniedPartns() {
- List<Partition> partitions = new ArrayList<Partition>();
- partitions.addAll(confirmedPartns);
- partitions.addAll(unknownPartns);
- return partitions;
- }
-
- /**
- * set confirmed partitions.
- *
- * @param confirmedPartns
- * confirmed paritions
- */
- public void setConfirmedPartns(Set<Partition> confirmedPartns) {
- this.confirmedPartns = confirmedPartns;
+ return new ArrayList<Partition>(partitions);
}
/**
- * set unknown partitions.
- *
- * @param unknownPartns
- * unknown partitions
+ * @return Whether there are unknown partitions in {@link #getPartitions()} result.
*/
- public void setUnknownPartns(Set<Partition> unknownPartns) {
- this.unknownPartns = unknownPartns;
+ public boolean hasUnknownPartitions() {
+ return hasUnknowns;
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java Fri Aug 16 01:21:54 2013
@@ -66,7 +66,7 @@ public class ExprNodeGenericFuncDesc ext
* This class uses a writableObjectInspector rather than a TypeInfo to store
* the canonical type information for this NodeDesc.
*/
- private ObjectInspector writableObjectInspector;
+ private transient ObjectInspector writableObjectInspector;
//Is this an expression that should perform a comparison for sorted searches
private boolean isSortedExpr;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Fri Aug 16 01:21:54 2013
@@ -21,10 +21,11 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.parse.SplitSample;
/**
@@ -193,6 +194,36 @@ public class FetchWork implements Serial
}
/**
+ * Get Partition descriptors in sorted (ascending) order of partition directory
+ *
+ * @return the partDesc array list
+ */
+ @Explain(displayName = "Partition Description", normalExplain = false)
+ public ArrayList<PartitionDesc> getPartDescOrderedByPartDir() {
+ ArrayList<PartitionDesc> partDescOrdered = partDesc;
+
+ if (partDir != null && partDir.size() > 1) {
+ if (partDesc == null || partDir.size() != partDesc.size()) {
+ throw new RuntimeException(
+ "Partiton Directory list size doesn't match Partition Descriptor list size");
+ }
+
+ // Construct a sorted Map of Partition Dir - Partition Descriptor; ordering is based on
+ // patition dir (map key)
+ // Assumption: there is a 1-1 mapping between partition dir and partition descriptor lists
+ TreeMap<String, PartitionDesc> partDirToPartSpecMap = new TreeMap<String, PartitionDesc>();
+ for (int i = 0; i < partDir.size(); i++) {
+ partDirToPartSpecMap.put(partDir.get(i), partDesc.get(i));
+ }
+
+ // Extract partition desc from sorted map (ascending order of part dir)
+ partDescOrdered = new ArrayList<PartitionDesc>(partDirToPartSpecMap.values());
+ }
+
+ return partDescOrdered;
+ }
+
+ /**
* @return the partDescs for paths
*/
public List<PartitionDesc> getPartDescs(List<Path> paths) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/ListBucketingCtx.java Fri Aug 16 01:21:54 2013
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -40,7 +39,7 @@ public class ListBucketingCtx implements
private static final long serialVersionUID = 1L;
private List<String> skewedColNames;
private List<List<String>> skewedColValues;
- private Map<SkewedValueList, String> lbLocationMap;
+ private Map<List<String>, String> lbLocationMap;
private List<SkewedColumnPositionPair> rowSkewedIndex;
private boolean isStoredAsSubDirectories;
private String defaultKey;
@@ -83,14 +82,14 @@ public class ListBucketingCtx implements
/**
* @return the lbLocationMap
*/
- public Map<SkewedValueList, String> getLbLocationMap() {
+ public Map<List<String>, String> getLbLocationMap() {
return lbLocationMap;
}
/**
* @param lbLocationMap the lbLocationMap to set
*/
- public void setLbLocationMap(Map<SkewedValueList, String> lbLocationMap) {
+ public void setLbLocationMap(Map<List<String>, String> lbLocationMap) {
this.lbLocationMap = lbLocationMap;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Fri Aug 16 01:21:54 2013
@@ -137,6 +137,10 @@ public class TableScanDesc extends Abstr
this.virtualCols.addAll(virtualCols);
}
+ public boolean hasVirtualCols() {
+ return virtualCols != null && !virtualCols.isEmpty();
+ }
+
public void setStatsAggPrefix(String k) {
statsAggKeyPrefix = k;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java Fri Aug 16 01:21:54 2013
@@ -317,15 +317,11 @@ public class StorageBasedAuthorizationPr
}
private HiveException hiveException(Exception e) {
- HiveException ex = new HiveException(e);
- ex.initCause(e);
- return ex;
+ return new HiveException(e);
}
private AuthorizationException authorizationException(Exception e) {
- AuthorizationException ex = new AuthorizationException(e);
- ex.initCause(e);
- return ex;
+ return new AuthorizationException(e);
}
private static AccessControlException accessControlException(