You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/05 12:34:11 UTC
svn commit: r1464915 [3/5] - in /hive/branches/HIVE-4115: ./
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/
hcatalog/build-support/ant/ hcatalog/historical/branches/
hcatalog/historical/sit...
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Apr 5 10:34:08 2013
@@ -35,6 +35,8 @@ import java.util.regex.PatternSyntaxExce
import org.antlr.runtime.tree.BaseTree;
import org.antlr.runtime.tree.Tree;
+import org.antlr.runtime.tree.TreeWizard;
+import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
@@ -80,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -117,6 +120,24 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionedTableFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -152,6 +173,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -198,6 +223,8 @@ public class SemanticAnalyzer extends Ba
private Map<JoinOperator, QBJoinTree> joinContext;
private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
private final HashMap<TableScanOperator, Table> topToTable;
+ private final Map<FileSinkOperator, Table> fsopToTable;
+ private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
private QB qb;
private ASTNode ast;
private int destTableId;
@@ -259,6 +286,8 @@ public class SemanticAnalyzer extends Ba
joinContext = new HashMap<JoinOperator, QBJoinTree>();
smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
topToTable = new HashMap<TableScanOperator, Table>();
+ fsopToTable = new HashMap<FileSinkOperator, Table>();
+ reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
destTableId = 1;
uCtx = null;
listMapJoinOpsNoReducer = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
@@ -317,11 +346,14 @@ public class SemanticAnalyzer extends Ba
public ParseContext getParseContext() {
return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
- topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, loadTableWork,
+ topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable,
+ fsopToTable, loadTableWork,
loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
- opToPartToSkewedPruner, viewAliasToInput);
+ opToPartToSkewedPruner, viewAliasToInput,
+ reduceSinkOperatorsAddedByEnforceBucketingSorting,
+ queryProperties);
}
@SuppressWarnings("nls")
@@ -332,7 +364,9 @@ public class SemanticAnalyzer extends Ba
switch (ast.getToken().getType()) {
case HiveParser.TOK_QUERY: {
QB qb = new QB(id, alias, true);
- doPhase1(ast, qb, initPhase1Ctx());
+ Phase1Ctx ctx_1 = initPhase1Ctx();
+ doPhase1(ast, qb, ctx_1);
+
qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
qbexpr.setQB(qb);
}
@@ -358,13 +392,28 @@ public class SemanticAnalyzer extends Ba
}
private LinkedHashMap<String, ASTNode> doPhase1GetAggregationsFromSelect(
- ASTNode selExpr) {
+ ASTNode selExpr, QB qb, String dest) {
+
// Iterate over the selects search for aggregation Trees.
// Use String as keys to eliminate duplicate trees.
LinkedHashMap<String, ASTNode> aggregationTrees = new LinkedHashMap<String, ASTNode>();
for (int i = 0; i < selExpr.getChildCount(); ++i) {
- ASTNode sel = (ASTNode) selExpr.getChild(i).getChild(0);
- doPhase1GetAllAggregations(sel, aggregationTrees);
+ ASTNode sel = (ASTNode) selExpr.getChild(i);
+ doPhase1GetAllAggregations((ASTNode) sel.getChild(0), aggregationTrees);
+ }
+
+ /*
+ * remove any aggregation to be handled by Windowing.
+ */
+ if ( queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null ) {
+ HashMap<String, ASTNode> aliasToWdwExprs = qb.getParseInfo().getWindowingExprsForClause(dest);
+ LinkedHashMap<String, ASTNode> aggTreesMinusWindowing = new LinkedHashMap<String, ASTNode>();
+ for(Map.Entry<String,ASTNode> entry : aggregationTrees.entrySet()) {
+ if ( !aliasToWdwExprs.containsKey(entry.getKey())) {
+ aggTreesMinusWindowing.put(entry.getKey(), entry.getValue());
+ }
+ }
+ aggregationTrees = aggTreesMinusWindowing;
}
return aggregationTrees;
}
@@ -650,6 +699,17 @@ public class SemanticAnalyzer extends Ba
processTable(qb, child);
} else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) {
processSubQuery(qb, child);
+ } else if (child.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) {
+ queryProperties.setHasPTF(true);
+ processPTF(qb, child);
+ PTFInvocationSpec ptfInvocationSpec = qb.getPTFInvocationSpec(child);
+ String inputAlias = ptfInvocationSpec == null ? null :
+ ((PartitionedTableFunctionSpec)ptfInvocationSpec.getFunction()).getAlias();;
+ if ( inputAlias == null ) {
+ throw new SemanticException(generateErrorMessage(child,
+ "PTF invocation in a Join must have an alias"));
+ }
+
} else if (child.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) {
// SELECT * FROM src1 LATERAL VIEW udtf() AS myTable JOIN src2 ...
// is not supported. Instead, the lateral view must be in a subquery
@@ -741,11 +801,14 @@ public class SemanticAnalyzer extends Ba
qbp.setHints((ASTNode) ast.getChild(0));
}
- LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
+ handleWindowingExprsInSelectList(qb, ctx_1.dest, ast);
+
+ LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast,
+ qb, ctx_1.dest);
doPhase1GetColumnAliasesFromSelect(ast, qbp);
qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
qbp.setDistinctFuncExprsForClause(ctx_1.dest,
- doPhase1GetDistinctFuncExprs(aggregations));
+ doPhase1GetDistinctFuncExprs(aggregations));
break;
case HiveParser.TOK_WHERE:
@@ -770,7 +833,6 @@ public class SemanticAnalyzer extends Ba
.getMsg(ast));
}
}
-
qbp.setDestForClause(ctx_1.dest, (ASTNode) ast.getChild(0));
break;
@@ -793,6 +855,9 @@ public class SemanticAnalyzer extends Ba
queryProperties.setHasJoin(true);
processJoin(qb, frm);
qbp.setJoinExpr(frm);
+ }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){
+ queryProperties.setHasPTF(true);
+ processPTF(qb, frm);
}
break;
@@ -819,7 +884,7 @@ public class SemanticAnalyzer extends Ba
break;
case HiveParser.TOK_SORTBY:
- // Get the sort by aliases - these are aliased to the entries in the
+ // Get the sort by aliases - these are aliased to the entries in the
// select list
queryProperties.setHasSortBy(true);
qbp.setSortByExprForClause(ctx_1.dest, ast);
@@ -873,7 +938,16 @@ public class SemanticAnalyzer extends Ba
case HiveParser.TOK_HAVING:
qbp.setHavingExprForClause(ctx_1.dest, ast);
- qbp.addAggregationExprsForClause(ctx_1.dest, doPhase1GetAggregationsFromSelect(ast));
+ qbp.addAggregationExprsForClause(ctx_1.dest,
+ doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest));
+ break;
+
+ case HiveParser.KW_WINDOW:
+ if (!qb.hasWindowingSpec(ctx_1.dest) ) {
+ throw new SemanticException(generateErrorMessage(ast,
+ "Query has no Cluster/Distribute By; but has a Window definition"));
+ }
+ handleQueryWindowClauses(qb, ctx_1, ast);
break;
case HiveParser.TOK_LIMIT:
@@ -2193,12 +2267,20 @@ public class SemanticAnalyzer extends Ba
List<ASTNode> result = new ArrayList<ASTNode>(selectExprs == null ? 0
: selectExprs.getChildCount());
if (selectExprs != null) {
+ HashMap<String, ASTNode> windowingExprs = parseInfo.getWindowingExprsForClause(dest);
+
for (int i = 0; i < selectExprs.getChildCount(); ++i) {
if (((ASTNode) selectExprs.getChild(i)).getToken().getType() == HiveParser.TOK_HINTLIST) {
continue;
}
// table.column AS alias
ASTNode grpbyExpr = (ASTNode) selectExprs.getChild(i).getChild(0);
+ /*
+ * If this is handled by Windowing then ignore it.
+ */
+ if (windowingExprs != null && windowingExprs.containsKey(grpbyExpr.toStringTree())) {
+ continue;
+ }
result.add(grpbyExpr);
}
}
@@ -2225,7 +2307,10 @@ public class SemanticAnalyzer extends Ba
String tabAlias = null;
String[] colRef = new String[2];
- if (selExpr.getChildCount() == 2) {
+ //for queries with a windowing expressions, the selexpr may have a third child
+ if (selExpr.getChildCount() == 2 ||
+ (selExpr.getChildCount() == 3 &&
+ selExpr.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC)) {
// return zz for "xx + yy AS zz"
colAlias = unescapeIdentifier(selExpr.getChild(1).getText());
colRef[0] = tabAlias;
@@ -2302,6 +2387,7 @@ public class SemanticAnalyzer extends Ba
return false;
}
+
private Operator<?> genSelectPlan(String dest, QB qb, Operator<?> input)
throws SemanticException {
ASTNode selExprList = qb.getParseInfo().getSelForClause(dest);
@@ -2439,11 +2525,14 @@ public class SemanticAnalyzer extends Ba
// child can be EXPR AS ALIAS, or EXPR.
ASTNode child = (ASTNode) exprList.getChild(i);
boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
+ boolean isWindowSpec = child.getChildCount() == 3 ?
+ (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
+ false;
// EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
// This check is not needed and invalid when there is a transform b/c the
// AST's are slightly different.
- if (!isInTransform && !isUDTF && child.getChildCount() > 2) {
+ if (!isWindowSpec && !isInTransform && !isUDTF && child.getChildCount() > 2) {
throw new SemanticException(generateErrorMessage(
(ASTNode) child.getChild(2),
ErrorMsg.INVALID_AS.getMsg()));
@@ -5180,6 +5269,7 @@ public class SemanticAnalyzer extends Ba
+ dest_path + " row schema: " + inputRR.toString());
}
+ fsopToTable.put((FileSinkOperator) output, dest_tab);
return output;
}
@@ -5587,6 +5677,7 @@ public class SemanticAnalyzer extends Ba
partitionCols, order.toString(), numReducers),
new RowSchema(inputRR.getColumnInfos()), input), inputRR);
interim.setColumnExprMap(colExprMap);
+ reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
// Add the extract operator to get the value fields
RowResolver out_rwsch = new RowResolver();
@@ -5609,6 +5700,7 @@ public class SemanticAnalyzer extends Ba
LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() +
" row schema: " + out_rwsch.toString());
}
+
return output;
}
@@ -6290,12 +6382,18 @@ public class SemanticAnalyzer extends Ba
ASTNode right = (ASTNode) joinParseTree.getChild(1);
if ((left.getToken().getType() == HiveParser.TOK_TABREF)
- || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+ || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)
+ || (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
String tableName = getUnescapedUnqualifiedTableName((ASTNode) left.getChild(0))
.toLowerCase();
String alias = left.getChildCount() == 1 ? tableName
: unescapeIdentifier(left.getChild(left.getChildCount() - 1)
- .getText().toLowerCase());
+ .getText().toLowerCase());
+ // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+ // guranteed to have an lias here: check done in processJoin
+ alias = (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+ unescapeIdentifier(left.getChild(1).getText().toLowerCase()) :
+ alias;
joinTree.setLeftAlias(alias);
String[] leftAliases = new String[1];
leftAliases[0] = alias;
@@ -6321,12 +6419,18 @@ public class SemanticAnalyzer extends Ba
}
if ((right.getToken().getType() == HiveParser.TOK_TABREF)
- || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+ || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)
+ || (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
String tableName = getUnescapedUnqualifiedTableName((ASTNode) right.getChild(0))
.toLowerCase();
String alias = right.getChildCount() == 1 ? tableName
: unescapeIdentifier(right.getChild(right.getChildCount() - 1)
- .getText().toLowerCase());
+ .getText().toLowerCase());
+ // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+ // guranteed to have an lias here: check done in processJoin
+ alias = (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+ unescapeIdentifier(right.getChild(1).getText().toLowerCase()) :
+ alias;
String[] rightAliases = new String[1];
rightAliases[0] = alias;
joinTree.setRightAliases(rightAliases);
@@ -6430,8 +6534,10 @@ public class SemanticAnalyzer extends Ba
joinTree.setStreamAliases(streamAliases);
}
- private void mergeJoins(QB qb, QBJoinTree parent, QBJoinTree node,
- QBJoinTree target, int pos) {
+ /**
+ * Merges node to target
+ */
+ private void mergeJoins(QB qb, QBJoinTree node, QBJoinTree target, int pos) {
String[] nodeRightAliases = node.getRightAliases();
String[] trgtRightAliases = target.getRightAliases();
String[] rightAliases = new String[nodeRightAliases.length
@@ -6516,12 +6622,6 @@ public class SemanticAnalyzer extends Ba
filterPos.addAll(node.getFiltersForPushing().get(0));
}
- if (qb.getQbJoinTree() == node) {
- qb.setQbJoinTree(node.getJoinSrc());
- } else {
- parent.setJoinSrc(node.getJoinSrc());
- }
-
if (node.getNoOuterJoin() && target.getNoOuterJoin()) {
target.setNoOuterJoin(true);
} else {
@@ -6606,49 +6706,81 @@ public class SemanticAnalyzer extends Ba
return res;
}
- private boolean mergeJoinNodes(QB qb, QBJoinTree parent, QBJoinTree node,
- QBJoinTree target) {
- if (target == null) {
- return false;
+ // try merge join tree from inner most source
+ // (it was merged from outer most to inner, which could be invalid)
+ //
+ // in a join tree ((A-B)-C)-D where C is not mergeable with A-B,
+ // D can be merged with A-B into single join If and only if C and D has same join type
+ // In this case, A-B-D join will be executed first and ABD-C join will be executed in next
+ private void mergeJoinTree(QB qb) {
+ QBJoinTree tree = qb.getQbJoinTree();
+ if (tree.getJoinSrc() == null) {
+ return;
}
- if (!node.getNoOuterJoin() || !target.getNoOuterJoin()) {
- // todo 8 way could be not enough number
- if (node.getLeftAliases().length + node.getRightAliases().length + 1 >= 32) {
- LOG.info(ErrorMsg.JOINNODE_OUTERJOIN_MORETHAN_32);
- return false;
+ // make array with QBJoinTree : outer most(0) --> inner most(n)
+ List<QBJoinTree> trees = new ArrayList<QBJoinTree>();
+ for (;tree != null; tree = tree.getJoinSrc()) {
+ trees.add(tree);
+ }
+ // merging from 'target'(inner) to 'node'(outer)
+ for (int i = trees.size() - 1; i >= 0; i--) {
+ QBJoinTree target = trees.get(i);
+ if (target == null) {
+ continue;
+ }
+ JoinType prevType = null; // save join type
+ for (int j = i - 1; j >= 0; j--) {
+ QBJoinTree node = trees.get(j);
+ if (node == null) {
+ continue;
+ }
+ JoinType currType = getType(node.getJoinCond());
+ if (prevType != null && prevType != currType) {
+ break;
+ }
+ int pos = findMergePos(node, target);
+ if (pos >= 0) {
+ // for outer joins, it should not exceed 16 aliases (short type)
+ if (!node.getNoOuterJoin() || !target.getNoOuterJoin()) {
+ if (node.getRightAliases().length + target.getRightAliases().length + 1 > 16) {
+ LOG.info(ErrorMsg.JOINNODE_OUTERJOIN_MORETHAN_16);
+ continue;
+ }
+ }
+ mergeJoins(qb, node, target, pos);
+ trees.set(j, null);
+ continue; // continue merging with next alias
+ }
+ if (prevType == null) {
+ prevType = currType;
+ }
}
}
- int res = findMergePos(node, target);
- if (res != -1) {
- mergeJoins(qb, parent, node, target, res);
- return true;
+ // reconstruct join tree
+ QBJoinTree current = null;
+ for (int i = 0; i < trees.size(); i++) {
+ QBJoinTree target = trees.get(i);
+ if (target == null) {
+ continue;
+ }
+ if (current == null) {
+ qb.setQbJoinTree(current = target);
+ } else {
+ current.setJoinSrc(target);
+ current = target;
+ }
}
-
- return mergeJoinNodes(qb, parent, node, target.getJoinSrc());
}
- private void mergeJoinTree(QB qb) {
- QBJoinTree root = qb.getQbJoinTree();
- QBJoinTree parent = null;
- while (root != null) {
- boolean merged = mergeJoinNodes(qb, parent, root, root.getJoinSrc());
-
- if (parent == null) {
- if (merged) {
- root = qb.getQbJoinTree();
- } else {
- parent = root;
- root = root.getJoinSrc();
- }
- } else {
- if (merged) {
- root = root.getJoinSrc();
- } else {
- parent = parent.getJoinSrc();
- root = parent.getJoinSrc();
- }
+ // Join types should be all the same for merging (or returns null)
+ private JoinType getType(JoinCond[] conds) {
+ JoinType type = conds[0].getJoinType();
+ for (int k = 1; k < conds.length; k++) {
+ if (type != conds[k].getJoinType()) {
+ return null;
}
}
+ return type;
}
private Operator insertSelectAllPlanForGroupBy(Operator input)
@@ -7109,6 +7241,11 @@ public class SemanticAnalyzer extends Ba
curr = genHavingPlan(dest, qb, curr);
}
+
+ if(queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null) {
+ curr = genWindowingPlan(qb.getWindowingSpec(dest), curr);
+ }
+
curr = genSelectPlan(dest, qb, curr);
Integer limit = qbp.getDestLimit(dest);
@@ -7148,6 +7285,7 @@ public class SemanticAnalyzer extends Ba
curr = genReduceSinkPlan(dest, qb, curr, numReducers);
}
+
if (qbp.getIsSubQ()) {
if (limit != null) {
// In case of order by, only 1 reducer is used, so no need of
@@ -7368,17 +7506,21 @@ public class SemanticAnalyzer extends Ba
}
RowResolver rowResolver = new RowResolver();
+ Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
+
List<String> colName = new ArrayList<String>();
for (int i = 0; i < columns.size(); i++) {
String name = getColumnInternalName(i);
- rowResolver.put(origInputAlias, name, new ColumnInfo(name, columns.get(i)
- .getTypeInfo(), "", false));
+ ColumnInfo col = new ColumnInfo(name, columns.get(i)
+ .getTypeInfo(), "", false);
+ rowResolver.put(origInputAlias, name, col);
colName.add(name);
+ columnExprMap.put(name, columns.get(i));
}
Operator<SelectDesc> newInputOp = OperatorFactory.getAndMakeChild(
new SelectDesc(columns, colName), new RowSchema(rowResolver.getColumnInfos()),
- origInputOp);
+ columnExprMap, origInputOp);
return putOpInsertMap(newInputOp, rowResolver);
}
@@ -7793,11 +7935,38 @@ public class SemanticAnalyzer extends Ba
aliasToOpInfo.put(alias, op);
}
+ Operator srcOpInfo = null;
+ Operator lastPTFOp = null;
+
+ if(queryProperties.hasPTF()){
+ //After processing subqueries and source tables, process
+ // partitioned table functions
+
+ HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
+ if ( ptfNodeToSpec != null ) {
+ for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
+ ASTNode ast = entry.getKey();
+ PTFInvocationSpec spec = entry.getValue();
+ String inputAlias = spec.getQueryInputName();
+ Operator inOp = aliasToOpInfo.get(inputAlias);
+ if ( inOp == null ) {
+ throw new SemanticException(generateErrorMessage(ast,
+ "Cannot resolve input Operator for PTF invocation"));
+ }
+ lastPTFOp = genPTFPlan(spec, inOp);
+ String ptfAlias = ((PartitionedTableFunctionSpec)spec.getFunction()).getAlias();
+ if ( ptfAlias != null ) {
+ aliasToOpInfo.put(ptfAlias, lastPTFOp);
+ }
+ }
+ }
+
+ }
+
// For all the source tables that have a lateral view, attach the
// appropriate operators to the TS
genLateralViewPlans(aliasToOpInfo, qb);
- Operator srcOpInfo = null;
// process join
if (qb.getParseInfo().getJoinExpr() != null) {
@@ -7820,6 +7989,9 @@ public class SemanticAnalyzer extends Ba
// Now if there are more than 1 sources then we have a join case
// later we can extend this to the union all case as well
srcOpInfo = aliasToOpInfo.values().iterator().next();
+ // with ptfs, there maybe more (note for PTFChains:
+ // 1 ptf invocation may entail multiple PTF operators)
+ srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
}
Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo);
@@ -8478,7 +8650,8 @@ public class SemanticAnalyzer extends Ba
}
// continue analyzing from the child ASTNode.
- if (!doPhase1(child, qb, initPhase1Ctx())) {
+ Phase1Ctx ctx_1 = initPhase1Ctx();
+ if (!doPhase1(child, qb, ctx_1)) {
// if phase1Result false return
return;
}
@@ -8515,11 +8688,12 @@ public class SemanticAnalyzer extends Ba
ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
- topToTable,
+ topToTable, fsopToTable,
loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
- opToPartToSkewedPruner, viewAliasToInput);
+ opToPartToSkewedPruner, viewAliasToInput,
+ reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
// Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
@@ -9672,4 +9846,1040 @@ public class SemanticAnalyzer extends Ba
public void setQB(QB qb) {
this.qb = qb;
}
+
+//--------------------------- PTF handling -----------------------------------
+
+ /*
+ * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+ * PTF invocation.
+ * - For a TABLEREF: set the source to the alias returned by processTable
+ * - For a SubQuery: set the source to the alias returned by processSubQuery
+ * - For a PTF invocation: recursively call processPTFChain.
+ */
+ private PTFInputSpec processPTFSource(QB qb, ASTNode inputNode) throws SemanticException{
+
+ PTFInputSpec qInSpec = null;
+ int type = inputNode.getType();
+ String alias;
+ switch(type)
+ {
+ case HiveParser.TOK_TABREF:
+ alias = processTable(qb, inputNode);
+ qInSpec = new PTFQueryInputSpec();
+ ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.TABLE);
+ ((PTFQueryInputSpec)qInSpec).setSource(alias);
+ break;
+ case HiveParser.TOK_SUBQUERY:
+ alias = processSubQuery(qb, inputNode);
+ qInSpec = new PTFQueryInputSpec();
+ ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.SUBQUERY);
+ ((PTFQueryInputSpec)qInSpec).setSource(alias);
+ break;
+ case HiveParser.TOK_PTBLFUNCTION:
+ qInSpec = processPTFChain(qb, inputNode);
+ break;
+ default:
+ throw new SemanticException(generateErrorMessage(inputNode,
+ "Unknown input type to PTF"));
+ }
+
+ qInSpec.setAstNode(inputNode);
+ return qInSpec;
+
+ }
+
+ /*
+ * - tree form is
+ * ^(TOK_PTBLFUNCTION name alias? partitionTableFunctionSource partitioningSpec? arguments*)
+ * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+ * PTF invocation.
+ */
+ private PartitionedTableFunctionSpec processPTFChain(QB qb, ASTNode ptf)
+ throws SemanticException{
+ int child_count = ptf.getChildCount();
+ if (child_count < 2) {
+ throw new SemanticException(generateErrorMessage(ptf,
+ "Not enough Children " + child_count));
+ }
+
+ PartitionedTableFunctionSpec ptfSpec = new PartitionedTableFunctionSpec();
+ ptfSpec.setAstNode(ptf);
+
+ /*
+ * name
+ */
+ ASTNode nameNode = (ASTNode) ptf.getChild(0);
+ ptfSpec.setName(nameNode.getText());
+
+ int inputIdx = 1;
+
+ /*
+ * alias
+ */
+ ASTNode secondChild = (ASTNode) ptf.getChild(1);
+ if ( secondChild.getType() == HiveParser.Identifier ) {
+ ptfSpec.setAlias(secondChild.getText());
+ inputIdx++;
+ }
+
+ /*
+ * input
+ */
+ ASTNode inputNode = (ASTNode) ptf.getChild(inputIdx);
+ ptfSpec.setInput(processPTFSource(qb, inputNode));
+
+ int argStartIdx = inputIdx + 1;
+
+ /*
+ * partitioning Spec
+ */
+ int pSpecIdx = inputIdx + 1;
+ ASTNode pSpecNode = ptf.getChildCount() > inputIdx ?
+ (ASTNode) ptf.getChild(pSpecIdx) : null;
+ if (pSpecNode != null && pSpecNode.getType() == HiveParser.TOK_PARTITIONINGSPEC)
+ {
+ PartitioningSpec partitioning = processPTFPartitionSpec(pSpecNode);
+ ptfSpec.setPartitioning(partitioning);
+ argStartIdx++;
+ }
+
+ /*
+ * arguments
+ */
+ for(int i=argStartIdx; i < ptf.getChildCount(); i++)
+ {
+ ptfSpec.addArg((ASTNode) ptf.getChild(i));
+ }
+ return ptfSpec;
+ }
+
+ /*
+ * - invoked during FROM AST tree processing, on encountering a PTF invocation.
+ * - tree form is
+ * ^(TOK_PTBLFUNCTION name partitionTableFunctionSource partitioningSpec? arguments*)
+ * - setup a PTFInvocationSpec for this top level PTF invocation.
+ */
+ private void processPTF(QB qb, ASTNode ptf) throws SemanticException{
+
+ PartitionedTableFunctionSpec ptfSpec = processPTFChain(qb, ptf);
+
+ if ( ptfSpec.getAlias() != null ) {
+ qb.addAlias(ptfSpec.getAlias());
+ }
+
+ PTFInvocationSpec spec = new PTFInvocationSpec();
+ spec.setFunction(ptfSpec);
+ qb.addPTFNodeToSpec(ptf, spec);
+ }
+
+//--------------------------- Windowing handling -----------------------------------
+
+ /*
+ * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+ * What makes a UDAF invocation a Windowing Function invocation:
+ * 1. It appears in a SelectExpr that as a WindowSpec
+ * 2. It is a UDAF that implies order (FunctionRegistry.impliesOrder)
+ * 3. It contains lead/lag UDF invocations in its args.
+ */
+ private boolean checkAndExtractWindowFunctionsInSelect(QB qb, ASTNode selectExpr, String dest)
+ throws SemanticException {
+
+ int childCount = selectExpr.getChildCount();
+ ASTNode windowSpec = (ASTNode) selectExpr.getChild(childCount - 1);
+
+ boolean hasWindowSpec = windowSpec.getType() == HiveParser.TOK_WINDOWSPEC;
+
+ ArrayList<ASTNode> functions =
+ extractWindowingUDAFs((ASTNode) selectExpr.getChild(0), !hasWindowSpec);
+ if ( functions.size() == 0 ) {
+ return false;
+ }
+
+ WindowingSpec spec = qb.getWindowingSpec(dest);
+ if(spec == null) {
+ queryProperties.setHasWindowing(true);
+ spec = new WindowingSpec();
+ qb.addDestToWindowingSpec(dest, spec);
+ }
+
+ HashMap<String, ASTNode> wExprsInDest = qb.getParseInfo().getWindowingExprsForClause(dest);
+ int wColIdx = spec.getWindowExpressions() == null ? 0 : spec.getWindowExpressions().size();
+ for(ASTNode function : functions) {
+ WindowFunctionSpec wFnSpec = processWindowFunction(function,
+ hasWindowSpec ? windowSpec : null);
+
+ /*
+ * If this is a duplicate invocation of a function; don't add to WindowingSpec.
+ */
+ if ( wExprsInDest != null &&
+ wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
+ continue;
+ }
+ wFnSpec.setAlias("_wcol" + wColIdx++);
+ spec.addWindowFunction(wFnSpec);
+ qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
+ }
+ return true;
+ }
+
+ /*
+ * return the UDAFs within the expressionTree.
+ * If implyOrder is true, then only return the invocations that:
+ * - are for UDAFs that implyOrder (FunctionRegistry.implyOrder)
+ * - or contain a Lead/Lag UDF invocation in their arguments
+ * If implyOrder is false, then return all UDAF invocations.
+ */
+ private ArrayList<ASTNode> extractWindowingUDAFs(ASTNode expressionTree, boolean implyOrder) {
+ ArrayList<ASTNode> aggregations = new ArrayList<ASTNode>();
+ extractWindowingUDAFs(expressionTree, aggregations);
+ if (!implyOrder) {
+ return aggregations;
+ }
+ ArrayList<ASTNode> wdwUDAFs = new ArrayList<ASTNode>();
+ for(ASTNode function : aggregations) {
+ String fnName = function.getChild(0).getText().toLowerCase();
+ if ( FunctionRegistry.impliesOrder(fnName)) {
+ wdwUDAFs.add(function);
+ continue;
+ }
+ boolean hasLLInArgs = false;
+ for(int i=1; i < function.getChildCount(); i++) {
+ ASTNode child = (ASTNode) function.getChild(i);
+ hasLLInArgs = containsLeadLagUDF(child);
+ if (hasLLInArgs) {
+ break;
+ }
+ }
+ if (hasLLInArgs) {
+ wdwUDAFs.add(function);
+ }
+ }
+ return wdwUDAFs;
+ }
+
+ private void extractWindowingUDAFs(ASTNode expressionTree,
+ ArrayList<ASTNode> aggregations) {
+ int exprTokenType = expressionTree.getToken().getType();
+ if (exprTokenType == HiveParser.TOK_FUNCTION
+ || exprTokenType == HiveParser.TOK_FUNCTIONDI
+ || exprTokenType == HiveParser.TOK_FUNCTIONSTAR) {
+ assert (expressionTree.getChildCount() != 0);
+ if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+ String functionName = unescapeIdentifier(expressionTree.getChild(0)
+ .getText());
+ WindowFunctionInfo fi = FunctionRegistry.getWindowFunctionInfo(functionName);
+ if (fi != null) {
+ aggregations.add(expressionTree);
+ return;
+ }
+ }
+ }
+ for (int i = 0; i < expressionTree.getChildCount(); i++) {
+ extractWindowingUDAFs((ASTNode) expressionTree.getChild(i),
+ aggregations);
+ }
+ }
+
+ private boolean containsLeadLagUDF(ASTNode expressionTree) {
+ int exprTokenType = expressionTree.getToken().getType();
+ if (exprTokenType == HiveParser.TOK_FUNCTION) {
+ assert (expressionTree.getChildCount() != 0);
+ if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+ String functionName = unescapeIdentifier(expressionTree.getChild(0)
+ .getText());
+ functionName = functionName.toLowerCase();
+ if ( FunctionRegistry.LAG_FUNC_NAME.equals(functionName) ||
+ FunctionRegistry.LEAD_FUNC_NAME.equals(functionName)
+ ) {
+ return true;
+ }
+ }
+ }
+ for (int i = 0; i < expressionTree.getChildCount(); i++) {
+ if ( containsLeadLagUDF((ASTNode) expressionTree.getChild(i))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /*
+ * - Invoked during Phase1 when a TOK_SELECT is encountered.
+ * - Select tree form is: ^(TOK_SELECT ^(TOK_SELECTEXPR...) ^(TOK_SELECTEXPR...) ...)
+ * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+ *
+ * See checkAndExtractWindowFunctionsInSelect for rules on what makes a UDAF invocation
+ * a Windowing Function invocation
+ */
+ private void handleWindowingExprsInSelectList(QB qb, String dest, ASTNode selectNode)
+ throws SemanticException {
+ for(int i=0; i < selectNode.getChildCount(); i++)
+ {
+ ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+ if ( selectExpr.getType() != HiveParser.TOK_SELEXPR )
+ {
+ continue;
+ }
+ boolean hasWindowingExprs = checkAndExtractWindowFunctionsInSelect(qb, selectExpr, dest);
+
+ }
+ }
+
+ private void handleQueryWindowClauses(QB qb, Phase1Ctx ctx_1, ASTNode node)
+ throws SemanticException {
+ WindowingSpec spec = qb.getWindowingSpec(ctx_1.dest);
+ for(int i=0; i < node.getChildCount(); i++) {
+ processQueryWindowClause(spec, (ASTNode) node.getChild(i));
+ }
+ }
+
+ private PartitionSpec processPartitionSpec(ASTNode node) {
+ PartitionSpec pSpec = new PartitionSpec();
+ int exprCnt = node.getChildCount();
+ for(int i=0; i < exprCnt; i++) {
+ PartitionExpression exprSpec = new PartitionExpression();
+ exprSpec.setExpression((ASTNode) node.getChild(i));
+ pSpec.addExpression(exprSpec);
+ }
+ return pSpec;
+ }
+
+ private OrderSpec processOrderSpec(ASTNode sortNode) {
+ OrderSpec oSpec = new OrderSpec();
+ int exprCnt = sortNode.getChildCount();
+ for(int i=0; i < exprCnt; i++) {
+ OrderExpression exprSpec = new OrderExpression();
+ exprSpec.setExpression((ASTNode) sortNode.getChild(i).getChild(0));
+ if ( sortNode.getChild(i).getType() == HiveParser.TOK_TABSORTCOLNAMEASC ) {
+ exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.ASC);
+ }
+ else {
+ exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC);
+ }
+ oSpec.addExpression(exprSpec);
+ }
+ return oSpec;
+ }
+
+ private PartitioningSpec processPTFPartitionSpec(ASTNode pSpecNode)
+ {
+ PartitioningSpec partitioning = new PartitioningSpec();
+ ASTNode firstChild = (ASTNode) pSpecNode.getChild(0);
+ int type = firstChild.getType();
+ int exprCnt;
+
+
+ if ( type == HiveParser.TOK_DISTRIBUTEBY || type == HiveParser.TOK_CLUSTERBY )
+ {
+ PartitionSpec pSpec = processPartitionSpec(firstChild);
+ partitioning.setPartSpec(pSpec);
+ ASTNode sortNode = pSpecNode.getChildCount() > 1 ? (ASTNode) pSpecNode.getChild(1) : null;
+ if ( sortNode != null )
+ {
+ OrderSpec oSpec = processOrderSpec(sortNode);
+ partitioning.setOrderSpec(oSpec);
+ }
+ }
+ else if ( type == HiveParser.TOK_SORTBY || type == HiveParser.TOK_ORDERBY ) {
+ ASTNode sortNode = firstChild;
+ OrderSpec oSpec = processOrderSpec(sortNode);
+ partitioning.setOrderSpec(oSpec);
+ }
+ return partitioning;
+ }
+
+ private WindowFunctionSpec processWindowFunction(ASTNode node, ASTNode wsNode)
+ throws SemanticException {
+ WindowFunctionSpec wfSpec = new WindowFunctionSpec();
+
+ switch(node.getType()) {
+ case HiveParser.TOK_FUNCTIONSTAR:
+ wfSpec.setStar(true);
+ break;
+ case HiveParser.TOK_FUNCTIONDI:
+ wfSpec.setDistinct(true);
+ break;
+ }
+
+ if ( wfSpec.isDistinct() ) {
+ throw new SemanticException(generateErrorMessage(node,
+ "Count/Sum distinct not supported with Windowing"));
+ }
+
+ wfSpec.setExpression(node);
+
+ ASTNode nameNode = (ASTNode) node.getChild(0);
+ wfSpec.setName(nameNode.getText());
+
+ for(int i=1; i < node.getChildCount(); i++) {
+ ASTNode child = (ASTNode) node.getChild(i);
+ wfSpec.addArg(child);
+ }
+
+ if ( wsNode != null ) {
+ WindowSpec ws = processWindowSpec(wsNode);
+ wfSpec.setWindowSpec(ws);
+ }
+
+ /*
+ * In order to distinguish between different UDAF invocations on the same UDAF but different Windows
+ * add the WdwSpec node as a child of the Function Node.
+ * It is safe to do this after the function node has been converetd to a WdwFuncSpec.
+ */
+ if ( wsNode != null ) {
+ node.addChild(wsNode);
+ }
+
+ return wfSpec;
+ }
+
+ private void processQueryWindowClause(WindowingSpec spec, ASTNode node)
+ throws SemanticException {
+ ASTNode nameNode = (ASTNode) node.getChild(0);
+ ASTNode wsNode = (ASTNode) node.getChild(1);
+ if(spec.getWindowSpecs() != null && spec.getWindowSpecs().containsKey(nameNode.getText())){
+ throw new SemanticException(generateErrorMessage(nameNode,
+ "Duplicate definition of window " + nameNode.getText() +
+ " is not allowed"));
+ }
+ WindowSpec ws = processWindowSpec(wsNode);
+ spec.addWindowSpec(nameNode.getText(), ws);
+ }
+
+ private WindowSpec processWindowSpec(ASTNode node) throws SemanticException {
+ String sourceId = null;
+ PartitionSpec partition = null;
+ OrderSpec order = null;
+ WindowFrameSpec windowFrame = null;
+
+ boolean hasSrcId = false, hasPartSpec = false, hasWF = false;
+ int srcIdIdx = -1, partIdx = -1, wfIdx = -1;
+
+ for(int i=0; i < node.getChildCount(); i++)
+ {
+ int type = node.getChild(i).getType();
+ switch(type)
+ {
+ case HiveParser.Identifier:
+ hasSrcId = true; srcIdIdx = i;
+ break;
+ case HiveParser.TOK_PARTITIONINGSPEC:
+ hasPartSpec = true; partIdx = i;
+ break;
+ case HiveParser.TOK_WINDOWRANGE:
+ case HiveParser.TOK_WINDOWVALUES:
+ hasWF = true; wfIdx = i;
+ break;
+ }
+ }
+
+ WindowSpec ws = new WindowSpec();
+
+ if (hasSrcId) {
+ ASTNode nameNode = (ASTNode) node.getChild(srcIdIdx);
+ ws.setSourceId(nameNode.getText());
+ }
+
+ if (hasPartSpec) {
+ ASTNode partNode = (ASTNode) node.getChild(partIdx);
+ PartitioningSpec partitioning = processPTFPartitionSpec(partNode);
+ ws.setPartitioning(partitioning);
+ }
+
+ if ( hasWF)
+ {
+ ASTNode wfNode = (ASTNode) node.getChild(wfIdx);
+ WindowFrameSpec wfSpec = processWindowFrame(wfNode);
+ ws.setWindowFrame(wfSpec);
+ }
+
+ return ws;
+ }
+
+ private WindowFrameSpec processWindowFrame(ASTNode node) throws SemanticException {
+ int type = node.getType();
+ BoundarySpec start = null, end = null;
+
+ /*
+ * A WindowFrame may contain just the Start Boundary or in the
+ * between style of expressing a WindowFrame both boundaries
+ * are specified.
+ */
+ start = processBoundary(type, (ASTNode) node.getChild(0));
+ if ( node.getChildCount() > 1 ) {
+ end = processBoundary(type, (ASTNode) node.getChild(1));
+ }
+
+ return new WindowFrameSpec(start, end);
+ }
+
+ private BoundarySpec processBoundary(int frameType, ASTNode node) throws SemanticException {
+ BoundarySpec bs = frameType == HiveParser.TOK_WINDOWRANGE ?
+ new RangeBoundarySpec() : new ValueBoundarySpec();
+ int type = node.getType();
+ boolean hasAmt = true;
+
+ switch(type)
+ {
+ case HiveParser.KW_PRECEDING:
+ bs.setDirection(Direction.PRECEDING);
+ break;
+ case HiveParser.KW_FOLLOWING:
+ bs.setDirection(Direction.FOLLOWING);
+ break;
+ case HiveParser.KW_CURRENT:
+ bs = new CurrentRowSpec();
+ hasAmt = false;
+ break;
+ }
+
+ if ( hasAmt )
+ {
+ ASTNode amtNode = (ASTNode) node.getChild(0);
+ if ( amtNode.getType() == HiveParser.KW_UNBOUNDED)
+ {
+ bs.setAmt(BoundarySpec.UNBOUNDED_AMOUNT);
+ }
+ else
+ {
+ int amt = Integer.parseInt(amtNode.getText());
+ if ( amt < 0 ) {
+ throw new SemanticException(
+ "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt);
+ }
+ bs.setAmt(amt);
+ }
+ }
+
+ return bs;
+ }
+
+ /*
+ * check if a Select Expr is a constant.
+ * - current logic used is to look for HiveParser.TOK_TABLE_OR_COL
+ * - if there is none then the expression is a constant.
+ */
+ private static class ConstantExprCheck implements ContextVisitor {
+ boolean isConstant = true;
+
+ public void visit(Object t, Object parent, int childIndex, Map labels) {
+ if ( !isConstant ) {
+ return;
+ }
+ ASTNode node = (ASTNode) t;
+ if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL ) {
+ isConstant = false;
+ }
+ }
+
+ public void reset() {
+ isConstant = true;
+ }
+
+ protected boolean isConstant() {
+ return isConstant;
+ }
+ }
+
+ private static class AggregationExprCheck implements ContextVisitor {
+ HashMap<String, ASTNode> destAggrExprs;
+ boolean isAggr = false;
+
+ public AggregationExprCheck(HashMap<String, ASTNode> destAggrExprs) {
+ super();
+ this.destAggrExprs = destAggrExprs;
+ }
+
+ public void visit(Object t, Object parent, int childIndex, Map labels) {
+ if ( isAggr ) {
+ return;
+ }
+ if ( destAggrExprs.values().contains(t)) {
+ isAggr = true;
+ }
+ }
+
+ public void reset() {
+ isAggr = false;
+ }
+
+ protected boolean isAggr() {
+ return isAggr;
+ }
+ }
+
+ /*
+ * Returns false if there is a SelectExpr that is not a constant or an aggr.
+ *
+ */
+ private boolean isValidGroupBySelectList(QB currQB, String clause){
+ ConstantExprCheck constantExprCheck = new ConstantExprCheck();
+ AggregationExprCheck aggrExprCheck = new AggregationExprCheck(
+ currQB.getParseInfo().getAggregationExprsForClause(clause));
+
+ TreeWizard tw = new TreeWizard(ParseDriver.adaptor, HiveParser.tokenNames);
+ ASTNode selectNode = currQB.getParseInfo().getSelForClause(clause);
+
+ /*
+ * for Select Distinct Queries we don't move any aggregations.
+ */
+ if ( selectNode != null && selectNode.getType() == HiveParser.TOK_SELECTDI ) {
+ return true;
+ }
+
+ for (int i = 0; selectNode != null && i < selectNode.getChildCount(); i++) {
+ ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+ //check for TOK_HINTLIST expressions on ast
+ if(selectExpr.getType() != HiveParser.TOK_SELEXPR){
+ continue;
+ }
+
+ constantExprCheck.reset();
+ PTFTranslator.visit(selectExpr.getChild(0), constantExprCheck);
+
+ if ( !constantExprCheck.isConstant() ) {
+ aggrExprCheck.reset();
+ PTFTranslator.visit(selectExpr.getChild(0), aggrExprCheck);
+ if (!aggrExprCheck.isAggr() ) {
+ return false;
+ }
+ }
+
+ }
+ return true;
+ }
+
+//--------------------------- PTF handling: PTFInvocationSpec to PTFDesc --------------------------
+
+ private PTFDesc translatePTFInvocationSpec(PTFInvocationSpec ptfQSpec, RowResolver inputRR)
+ throws SemanticException{
+ PTFDesc ptfDesc = null;
+ PTFTranslator translator = new PTFTranslator();
+ ptfDesc = translator.translate(ptfQSpec, this, conf, inputRR, unparseTranslator);
+ return ptfDesc;
+ }
+
+ Operator genPTFPlan(PTFInvocationSpec ptfQSpec, Operator input) throws SemanticException {
+ ArrayList<PTFInvocationSpec> componentQueries = PTFTranslator.componentize(ptfQSpec);
+ for (PTFInvocationSpec ptfSpec : componentQueries) {
+ input = genPTFPlanForComponentQuery(ptfSpec, input);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created PTF Plan ");
+ }
+ return input;
+ }
+
+
+ /**
+ * Construct the data structures containing ExprNodeDesc for partition
+ * columns and order columns. Use the input definition to construct the list
+ * of output columns for the ReduceSinkOperator
+ *
+ * @throws SemanticException
+ */
+ void buildPTFReduceSinkDetails(PartitionedTableFunctionDef tabDef,
+ RowResolver inputRR,
+ ArrayList<ExprNodeDesc> partCols,
+ ArrayList<ExprNodeDesc> valueCols,
+ ArrayList<ExprNodeDesc> orderCols,
+ Map<String, ExprNodeDesc> colExprMap,
+ List<String> outputColumnNames,
+ StringBuilder orderString,
+ RowResolver extractRR) throws SemanticException {
+
+ ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
+
+ for (PTFExpressionDef colDef : partColList) {
+ partCols.add(colDef.getExprNode());
+ orderCols.add(colDef.getExprNode());
+ orderString.append('+');
+ }
+
+ /*
+ * Order columns are used as key columns for constructing
+ * the ReduceSinkOperator
+ * Since we do not explicitly add these to outputColumnNames,
+ * we need to set includeKeyCols = false while creating the
+ * ReduceSinkDesc
+ */
+ ArrayList<OrderExpressionDef> orderColList = tabDef.getOrder().getExpressions();
+ for (int i = 0; i < orderColList.size(); i++) {
+ OrderExpressionDef colDef = orderColList.get(i);
+ org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = colDef.getOrder();
+ if (order.name().equals("ASC")) {
+ orderString.append('+');
+ } else {
+ orderString.append('-');
+ }
+ orderCols.add(colDef.getExprNode());
+ }
+
+ /*
+ * We add the column to value columns or output column names
+ * only if it is not a virtual column
+ */
+ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+ LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+ new LinkedHashMap<String[], ColumnInfo>();
+ int pos = 0;
+ for (ColumnInfo colInfo : colInfoList) {
+ if (!colInfo.isHiddenVirtualCol()) {
+ ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsVirtualCol());
+ valueCols.add(valueColExpr);
+ colExprMap.put(colInfo.getInternalName(), valueColExpr);
+ String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+ outputColumnNames.add(outColName);
+
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ /*
+ * if we have already encountered this colInfo internalName.
+ * We encounter it again because it must be put for the Having clause.
+ * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+ */
+ if ( colsAddedByHaving.containsKey(alias)) {
+ continue;
+ }
+ ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+ ColumnInfo eColInfo = new ColumnInfo(
+ outColName, colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+ if ( astNode == null ) {
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ else {
+ /*
+ * in case having clause refers to this column may have been added twice;
+ * once with the ASTNode.toStringTree as the alias
+ * and then with the real alias.
+ */
+ extractRR.putExpression(astNode, eColInfo);
+ if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+ colsAddedByHaving.put(alias, eColInfo);
+ }
+ }
+ }
+ }
+
+ for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+ String[] alias = columnAddedByHaving.getKey();
+ ColumnInfo eColInfo = columnAddedByHaving.getValue();
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ }
+
+ private Operator genPTFPlanForComponentQuery(PTFInvocationSpec ptfQSpec, Operator input)
+ throws SemanticException {
+ /*
+ * 1. Create the PTFDesc from the Qspec attached to this QB.
+ */
+ RowResolver rr = opParseCtx.get(input).getRowResolver();
+ PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+ /*
+ * Build an RR for the Extract Op from the ResuceSink Op's RR.
+ * Why?
+ * We need to remove the Virtual Columns present in the RS's RR. The OI
+ * that gets passed to Extract at runtime doesn't contain the Virtual Columns.
+ * So internal names get changed. Consider testCase testJoinWithLeadLag,
+ * which is a self join on part and also has a Windowing expression.
+ * The RR of the RS op at transaltion time looks something like this:
+ * (_co1,_col2,..,_col7, _col8(vc=true),_col9(vc=true),
+ * _col10,_col11,.._col15(vc=true),_col16(vc=true),..)
+ * At runtime the Virtual columns are removed and all the columns after _col7
+ * are shifted 1 or 2 positions.
+ * So in child Operators ColumnExprNodeDesc's are no longer referring to the right columns.
+ *
+ * So we build a new RR for the Extract Op, with the Virtual Columns removed.
+ * We hand this to the PTFTranslator as the
+ * starting RR to use to translate a PTF Chain.
+ */
+ RowResolver extractOpRR = new RowResolver();
+
+ /*
+ * 2. build Map-side Op Graph. Graph template is either:
+ * Input -> PTF_map -> ReduceSink
+ * or
+ * Input -> ReduceSink
+ *
+ * Here the ExprNodeDescriptors in the QueryDef are based on the Input Operator's RR.
+ */
+ {
+ PartitionedTableFunctionDef tabDef = ptfDesc.getStartOfChain();
+
+ /*
+ * a. add Map-side PTF Operator if needed
+ */
+ if (tabDef.isTransformsRawInput() )
+ {
+ RowResolver ptfMapRR = tabDef.getRawInputShape().getRr();
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfMapRR.getColumnInfos()),
+ input), ptfMapRR);
+ rr = opParseCtx.get(input).getRowResolver();
+ }
+
+ /*
+ * b. Build Reduce Sink Details (keyCols, valueCols, outColNames etc.) for this ptfDesc.
+ */
+
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ List<String> outputColumnNames = new ArrayList<String>();
+ StringBuilder orderString = new StringBuilder();
+
+ /*
+ * Use the input RR of TableScanOperator in case there is no map-side
+ * reshape of input.
+ * If the parent of ReduceSinkOperator is PTFOperator, use it's
+ * output RR.
+ */
+ buildPTFReduceSinkDetails(tabDef,
+ rr,
+ partCols,
+ valueCols,
+ orderCols,
+ colExprMap,
+ outputColumnNames,
+ orderString,
+ extractOpRR);
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+ .getReduceSinkDesc(orderCols,
+ valueCols, outputColumnNames, false,
+ -1, partCols, orderString.toString(), -1),
+ new RowSchema(rr.getColumnInfos()), input), rr);
+ input.setColumnExprMap(colExprMap);
+ }
+
+ /*
+ * 3. build Reduce-side Op Graph
+ */
+ {
+ /*
+ * b. Construct Extract Operator.
+ */
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new ExtractDesc(
+ new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE
+ .toString(), "", false)),
+ new RowSchema(extractOpRR.getColumnInfos()),
+ input), extractOpRR);
+
+ /*
+ * c. Rebuilt the QueryDef.
+ * Why?
+ * - so that the ExprNodeDescriptors in the QueryDef are based on the
+ * Extract Operator's RowResolver
+ */
+ rr = opParseCtx.get(input).getRowResolver();
+ ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+ /*
+ * d. Construct PTF Operator.
+ */
+ RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfOpRR.getColumnInfos()),
+ input), ptfOpRR);
+
+ }
+
+ return input;
+
+ }
+
+//--------------------------- Windowing handling: PTFInvocationSpec to PTFDesc --------------------
+
+ Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException {
+ wSpec.validateAndMakeEffective();
+ WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+ RowResolver rr = opParseCtx.get(input).getRowResolver();
+
+ while(groups.hasNext() ) {
+ wSpec = groups.next(conf, this, unparseTranslator, rr);
+ input = genReduceSinkPlanForWindowing(wSpec, rr, input);
+ rr = opParseCtx.get(input).getRowResolver();
+ PTFTranslator translator = new PTFTranslator();
+ PTFDesc ptfDesc = translator.translate(wSpec, this, conf, rr, unparseTranslator);
+ RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfOpRR.getColumnInfos()),
+ input), ptfOpRR);
+ rr = ptfOpRR;
+ }
+
+ return input;
+ }
+
+ private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
+ RowResolver inputRR,
+ Operator input) throws SemanticException{
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ List<String> outputColumnNames = new ArrayList<String>();
+ StringBuilder orderString = new StringBuilder();
+
+ ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
+ for (PartitionExpression partCol : partColList) {
+ ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
+ partCols.add(partExpr);
+ orderCols.add(partExpr);
+ orderString.append('+');
+ }
+
+ ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
+ new ArrayList<PTFInvocationSpec.OrderExpression>() :
+ spec.getQueryOrderSpec().getExpressions();
+ for (int i = 0; i < orderColList.size(); i++) {
+ OrderExpression orderCol = orderColList.get(i);
+ org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
+ if (order.name().equals("ASC")) {
+ orderString.append('+');
+ } else {
+ orderString.append('-');
+ }
+ ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
+ orderCols.add(orderExpr);
+ }
+
+ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+ RowResolver rsNewRR = new RowResolver();
+ int pos = 0;
+ for (ColumnInfo colInfo : colInfoList) {
+ ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsVirtualCol());
+ valueCols.add(valueColExpr);
+ colExprMap.put(colInfo.getInternalName(), valueColExpr);
+ String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+ outputColumnNames.add(outColName);
+
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ ColumnInfo newColInfo = new ColumnInfo(
+ outColName, colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+ rsNewRR.put(alias[0], alias[1], newColInfo);
+
+ }
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+ .getReduceSinkDesc(orderCols,
+ valueCols, outputColumnNames, false,
+ -1, partCols, orderString.toString(), -1),
+ new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+ input.setColumnExprMap(colExprMap);
+
+
+ // Construct the RR for extract operator
+ RowResolver extractRR = new RowResolver();
+ LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+ new LinkedHashMap<String[], ColumnInfo>();
+ pos = 0;
+
+ for (ColumnInfo colInfo : colInfoList) {
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ /*
+ * if we have already encountered this colInfo internalName.
+ * We encounter it again because it must be put for the Having clause.
+ * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+ */
+ if ( colsAddedByHaving.containsKey(alias)) {
+ continue;
+ }
+ ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+ ColumnInfo eColInfo = new ColumnInfo(
+ SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+ if ( astNode == null ) {
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ else {
+ /*
+ * in case having clause refers to this column may have been added twice;
+ * once with the ASTNode.toStringTree as the alias
+ * and then with the real alias.
+ */
+ extractRR.putExpression(astNode, eColInfo);
+ if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+ colsAddedByHaving.put(alias, eColInfo);
+ }
+ }
+ }
+
+ for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+ String[] alias = columnAddedByHaving.getKey();
+ ColumnInfo eColInfo = columnAddedByHaving.getValue();
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+
+ /*
+ * b. Construct Extract Operator.
+ */
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new ExtractDesc(
+ new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE
+ .toString(), "", false)),
+ new RowSchema(inputRR.getColumnInfos()),
+ input), extractRR);
+
+
+ return input;
+ }
+
+
+ public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
+ throws SemanticException
+ {
+ ASTNode selNode = null;
+ try {
+ ParseDriver pd = new ParseDriver();
+ selNode = pd.parseSelect(selectExprStr, null);
+ } catch (ParseException pe) {
+ throw new SemanticException(pe);
+ }
+
+ ArrayList<WindowExpressionSpec> selSpec = new ArrayList<WindowExpressionSpec>();
+ int childCount = selNode.getChildCount();
+ for (int i = 0; i < childCount; i++) {
+ ASTNode selExpr = (ASTNode) selNode.getChild(i);
+ if (selExpr.getType() != HiveParser.TOK_SELEXPR) {
+ throw new SemanticException(String.format(
+ "Only Select expressions supported in dynamic select list: %s", selectExprStr));
+ }
+ ASTNode expr = (ASTNode) selExpr.getChild(0);
+ if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
+ throw new SemanticException(
+ String.format("'%s' column not allowed in dynamic select list", selectExprStr));
+ }
+ ASTNode aliasNode = selExpr.getChildCount() > 1
+ && selExpr.getChild(1).getType() == HiveParser.Identifier ?
+ (ASTNode) selExpr.getChild(1) : null;
+ String alias = null;
+ if ( aliasNode != null ) {
+ alias = aliasNode.getText();
+ }
+ else {
+ String[] tabColAlias = getColAlias(selExpr, null, null, true, -1);
+ alias = tabColAlias[1];
+ }
+ WindowExpressionSpec exprSpec = new WindowExpressionSpec();
+ exprSpec.setAlias(alias);
+ exprSpec.setExpression(expr);
+ selSpec.add(exprSpec);
+ }
+
+ return selSpec;
+ }
+
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java Fri Apr 5 10:34:08 2013
@@ -492,6 +492,7 @@ public final class TypeCheckProcFactory
static HashMap<Integer, String> specialUnaryOperatorTextHashMap;
static HashMap<Integer, String> specialFunctionTextHashMap;
static HashMap<Integer, String> conversionFunctionTextHashMap;
+ static HashSet<Integer> windowingTokens;
static {
specialUnaryOperatorTextHashMap = new HashMap<Integer, String>();
specialUnaryOperatorTextHashMap.put(HiveParser.PLUS, "positive");
@@ -522,6 +523,22 @@ public final class TypeCheckProcFactory
serdeConstants.TIMESTAMP_TYPE_NAME);
conversionFunctionTextHashMap.put(HiveParser.TOK_DECIMAL,
serdeConstants.DECIMAL_TYPE_NAME);
+
+ windowingTokens = new HashSet<Integer>();
+ windowingTokens.add(HiveParser.KW_OVER);
+ windowingTokens.add(HiveParser.TOK_PARTITIONINGSPEC);
+ windowingTokens.add(HiveParser.TOK_DISTRIBUTEBY);
+ windowingTokens.add(HiveParser.TOK_SORTBY);
+ windowingTokens.add(HiveParser.TOK_CLUSTERBY);
+ windowingTokens.add(HiveParser.TOK_WINDOWSPEC);
+ windowingTokens.add(HiveParser.TOK_WINDOWRANGE);
+ windowingTokens.add(HiveParser.TOK_WINDOWVALUES);
+ windowingTokens.add(HiveParser.KW_UNBOUNDED);
+ windowingTokens.add(HiveParser.KW_PRECEDING);
+ windowingTokens.add(HiveParser.KW_FOLLOWING);
+ windowingTokens.add(HiveParser.KW_CURRENT);
+ windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEASC);
+ windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEDESC);
}
private static boolean isRedundantConversionFunction(ASTNode expr,
@@ -865,6 +882,20 @@ public final class TypeCheckProcFactory
ASTNode expr = (ASTNode) nd;
+ /*
+ * A Windowing specification get added as a child to a UDAF invocation to distinguish it
+ * from similar UDAFs but on different windows.
+ * The UDAF is translated to a WindowFunction invocation in the PTFTranslator.
+ * So here we just return null for tokens that appear in a Window Specification.
+ * When the traversal reaches up to the UDAF invocation its ExprNodeDesc is build using the
+ * ColumnInfo in the InputRR. This is similar to how UDAFs are handled in Select lists.
+ * The difference is that there is translation for Window related tokens, so we just
+ * return null;
+ */
+ if ( windowingTokens.contains(expr.getType())) {
+ return null;
+ }
+
if (expr.getType() == HiveParser.TOK_TABNAME) {
return null;
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java Fri Apr 5 10:34:08 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+
/**
* ColumnStats Work.
*
@@ -29,6 +31,8 @@ public class ColumnStatsWork implements
private static final long serialVersionUID = 1L;
private FetchWork fWork;
private ColumnStatsDesc colStats;
+ private static final int LIMIT = -1;
+
public ColumnStatsWork() {
}
@@ -61,4 +65,21 @@ public class ColumnStatsWork implements
public void setColStats(ColumnStatsDesc colStats) {
this.colStats = colStats;
}
+
+ public ListSinkOperator getSink() {
+ return fWork.getSink();
+ }
+
+ public void initializeForFetch() {
+ fWork.initializeForFetch();
+ }
+
+ public int getLeastNumRows() {
+ return fWork.getLeastNumRows();
+ }
+
+ public static int getLimit() {
+ return LIMIT;
+ }
+
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Fri Apr 5 10:34:08 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -46,7 +47,7 @@ public class TableScanDesc extends Abstr
/**
* Used for split sampling (row count per split)
* For example,
- * select count(1) from ss_src2 tablesample(10 ROWS);
+ * select count(1) from ss_src2 tablesample (10 ROWS) s;
* provides first 10 rows from all input splits
*/
private int rowLimit = -1;
@@ -67,6 +68,9 @@ public class TableScanDesc extends Abstr
public static final String FILTER_TEXT_CONF_STR =
"hive.io.filter.text";
+ // input file name (big) to bucket number
+ private Map<String, Integer> bucketFileNameMapping;
+
@SuppressWarnings("nls")
public TableScanDesc() {
}
@@ -170,4 +174,12 @@ public class TableScanDesc extends Abstr
public Integer getRowLimitExplain() {
return rowLimit >= 0 ? rowLimit : null;
}
+
+ public Map<String, Integer> getBucketFileNameMapping() {
+ return bucketFileNameMapping;
+ }
+
+ public void setBucketFileNameMapping(Map<String, Integer> bucketFileNameMapping) {
+ this.bucketFileNameMapping = bucketFileNameMapping;
+ }
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Fri Apr 5 10:34:08 2013
@@ -846,6 +846,10 @@ public final class OpProcFactory {
return new DefaultPPD();
}
+ public static NodeProcessor getPTFProc() {
+ return new ScriptPPD();
+ }
+
public static NodeProcessor getSCRProc() {
return new ScriptPPD();
}
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Fri Apr 5 10:34:08 2013
@@ -21,15 +21,15 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.UDTFOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -90,6 +90,9 @@ public class PredicatePushDown implement
opRules.put(new RuleRegExp("R1",
FilterOperator.getOperatorName() + "%"),
OpProcFactory.getFilterProc());
+ opRules.put(new RuleRegExp("R2",
+ PTFOperator.getOperatorName() + "%"),
+ OpProcFactory.getPTFProc());
opRules.put(new RuleRegExp("R3",
CommonJoinOperator.getOperatorName() + "%"),
OpProcFactory.getJoinProc());
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java Fri Apr 5 10:34:08 2013
@@ -24,8 +24,11 @@ import java.math.RoundingMode;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.serde2.io.BigDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
/**
* UDFRound.
@@ -38,6 +41,10 @@ import org.apache.hadoop.io.IntWritable;
public class UDFRound extends UDF {
private final BigDecimalWritable bigDecimalWritable = new BigDecimalWritable();
private final DoubleWritable doubleWritable = new DoubleWritable();
+ private final LongWritable longWritable = new LongWritable();
+ private final IntWritable intWritable = new IntWritable();
+ private final ShortWritable shortWritable = new ShortWritable();
+ private final ByteWritable byteWritable = new ByteWritable();
public UDFRound() {
}
@@ -48,7 +55,7 @@ public class UDFRound extends UDF {
doubleWritable.set(d);
} else {
doubleWritable.set(BigDecimal.valueOf(d).setScale(i,
- RoundingMode.HALF_UP).doubleValue());
+ RoundingMode.HALF_UP).doubleValue());
}
return doubleWritable;
}
@@ -88,4 +95,41 @@ public class UDFRound extends UDF {
return evaluate(n, i.get());
}
+ public LongWritable evaluate(LongWritable n) {
+ if (n == null) {
+ return null;
+ }
+ longWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+ RoundingMode.HALF_UP).longValue());
+ return longWritable;
+ }
+
+ public IntWritable evaluate(IntWritable n) {
+ if (n == null) {
+ return null;
+ }
+ intWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+ RoundingMode.HALF_UP).intValue());
+ return intWritable;
+ }
+
+ public ShortWritable evaluate(ShortWritable n) {
+ if (n == null) {
+ return null;
+ }
+ shortWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+ RoundingMode.HALF_UP).shortValue());
+ return shortWritable;
+ }
+
+ public ByteWritable evaluate(ByteWritable n) {
+ if (n == null) {
+ return null;
+ }
+ byteWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+ RoundingMode.HALF_UP).byteValue());
+ return byteWritable;
+ }
+
}
+
Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java Fri Apr 5 10:34:08 2013
@@ -229,24 +229,32 @@ public class GenericUDAFComputeStats ext
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
Object p = parameters[0];
BooleanStatsAgg myagg = (BooleanStatsAgg) agg;
- if (p == null) {
- myagg.countNulls++;
+ boolean emptyTable = false;
+
+ if (parameters[1] == null) {
+ emptyTable = true;
}
- else {
- try {
- boolean v = PrimitiveObjectInspectorUtils.getBoolean(p, inputOI);
- if (v == false) {
- myagg.countFalses++;
- } else if (v == true){
- myagg.countTrues++;
- }
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- LOG.warn(getClass().getSimpleName()
- + " ignoring similar exceptions.");
+
+ if (!emptyTable) {
+ if (p == null) {
+ myagg.countNulls++;
+ }
+ else {
+ try {
+ boolean v = PrimitiveObjectInspectorUtils.getBoolean(p, inputOI);
+ if (v == false) {
+ myagg.countFalses++;
+ } else if (v == true){
+ myagg.countTrues++;
+ }
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " "
+ + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName()
+ + " ignoring similar exceptions.");
+ }
}
}
}
@@ -472,14 +480,24 @@ public class GenericUDAFComputeStats ext
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
Object p = parameters[0];
LongStatsAgg myagg = (LongStatsAgg) agg;
+ boolean emptyTable = false;
+
+ if (parameters[1] == null) {
+ emptyTable = true;
+ }
if (myagg.firstItem) {
- int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ int numVectors = 0;
+ if (!emptyTable) {
+ numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ }
initNDVEstimator(myagg, numVectors);
myagg.firstItem = false;
myagg.numBitVectors = numVectors;
}
+ if (!emptyTable) {
+
//Update null counter if a null value is seen
if (p == null) {
myagg.countNulls++;
@@ -511,6 +529,7 @@ public class GenericUDAFComputeStats ext
}
}
}
+ }
}
@Override
@@ -572,7 +591,11 @@ public class GenericUDAFComputeStats ext
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
LongStatsAgg myagg = (LongStatsAgg) agg;
- long numDV = myagg.numDV.estimateNumDistinctValues();
+
+ long numDV = 0;
+ if (myagg.numBitVectors != 0) {
+ numDV = myagg.numDV.estimateNumDistinctValues();
+ }
// Serialize the result struct
((Text) result[0]).set(myagg.columnType);
@@ -770,42 +793,54 @@ public class GenericUDAFComputeStats ext
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
Object p = parameters[0];
DoubleStatsAgg myagg = (DoubleStatsAgg) agg;
+ boolean emptyTable = false;
+
+ if (parameters[1] == null) {
+ emptyTable = true;
+ }
if (myagg.firstItem) {
- int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ int numVectors = 0;
+ if (!emptyTable) {
+ numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ }
initNDVEstimator(myagg, numVectors);
myagg.firstItem = false;
myagg.numBitVectors = numVectors;
}
- //Update null counter if a null value is seen
- if (p == null) {
- myagg.countNulls++;
- }
- else {
- try {
- double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
-
- //Update min counter if new value is less than min seen so far
- if (v < myagg.min) {
- myagg.min = v;
- }
+ if (!emptyTable) {
- //Update max counter if new value is greater than max seen so far
- if (v > myagg.max) {
- myagg.max = v;
- }
+ //Update null counter if a null value is seen
+ if (p == null) {
+ myagg.countNulls++;
+ }
+ else {
+ try {
- // Add value to NumDistinctValue Estimator
- myagg.numDV.addToEstimator(v);
+ double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- LOG.warn(getClass().getSimpleName()
- + " ignoring similar exceptions.");
+ //Update min counter if new value is less than min seen so far
+ if (v < myagg.min) {
+ myagg.min = v;
+ }
+
+ //Update max counter if new value is greater than max seen so far
+ if (v > myagg.max) {
+ myagg.max = v;
+ }
+
+ // Add value to NumDistinctValue Estimator
+ myagg.numDV.addToEstimator(v);
+
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " "
+ + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName()
+ + " ignoring similar exceptions.");
+ }
}
}
}
@@ -870,7 +905,11 @@ public class GenericUDAFComputeStats ext
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
DoubleStatsAgg myagg = (DoubleStatsAgg) agg;
- long numDV = myagg.numDV.estimateNumDistinctValues();
+ long numDV = 0;
+
+ if (myagg.numBitVectors != 0) {
+ numDV = myagg.numDV.estimateNumDistinctValues();
+ }
// Serialize the result struct
((Text) result[0]).set(myagg.columnType);
@@ -1082,44 +1121,56 @@ public class GenericUDAFComputeStats ext
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
Object p = parameters[0];
StringStatsAgg myagg = (StringStatsAgg) agg;
+ boolean emptyTable = false;
+
+ if (parameters[1] == null) {
+ emptyTable = true;
+ }
if (myagg.firstItem) {
- int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ int numVectors = 0;
+ if (!emptyTable) {
+ numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+ }
initNDVEstimator(myagg, numVectors);
myagg.firstItem = false;
myagg.numBitVectors = numVectors;
}
- // Update null counter if a null value is seen
- if (p == null) {
- myagg.countNulls++;
- }
- else {
- try {
- String v = PrimitiveObjectInspectorUtils.getString(p, inputOI);
-
- // Update max length if new length is greater than the ones seen so far
- int len = v.length();
- if (len > myagg.maxLength) {
- myagg.maxLength = len;
- }
-
- // Update sum length with the new length
- myagg.sumLength += len;
+ if (!emptyTable) {
- // Increment count of values seen so far
- myagg.count++;
+ // Update null counter if a null value is seen
+ if (p == null) {
+ myagg.countNulls++;
+ }
+ else {
+ try {
- // Add string value to NumDistinctValue Estimator
- myagg.numDV.addToEstimator(v);
+ String v = PrimitiveObjectInspectorUtils.getString(p, inputOI);
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- LOG.warn(getClass().getSimpleName()
- + " ignoring similar exceptions.");
+ // Update max length if new length is greater than the ones seen so far
+ int len = v.length();
+ if (len > myagg.maxLength) {
+ myagg.maxLength = len;
+ }
+
+ // Update sum length with the new length
+ myagg.sumLength += len;
+
+ // Increment count of values seen so far
+ myagg.count++;
+
+ // Add string value to NumDistinctValue Estimator
+ myagg.numDV.addToEstimator(v);
+
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " "
+ + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName()
+ + " ignoring similar exceptions.");
+ }
}
}
}
@@ -1186,8 +1237,18 @@ public class GenericUDAFComputeStats ext
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
StringStatsAgg myagg = (StringStatsAgg) agg;
- long numDV = myagg.numDV.estimateNumDistinctValues();
- double avgLength = (double)(myagg.sumLength/(1.0 * (myagg.count + myagg.countNulls)));
+
+ long numDV = 0;
+ double avgLength = 0.0;
+ long total = myagg.count + myagg.countNulls;
+
+ if (myagg.numBitVectors != 0) {
+ numDV = myagg.numDV.estimateNumDistinctValues();
+ }
+
+ if (total != 0) {
+ avgLength = (double)(myagg.sumLength / (1.0 * total));
+ }
// Serialize the result struct
((Text) result[0]).set(myagg.columnType);
@@ -1347,34 +1408,41 @@ public class GenericUDAFComputeStats ext
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
Object p = parameters[0];
BinaryStatsAgg myagg = (BinaryStatsAgg) agg;
+ boolean emptyTable = false;
- // Update null counter if a null value is seen
- if (p == null) {
- myagg.countNulls++;
+ if (parameters[1] == null) {
+ emptyTable = true;
}
- else {
- try {
- BytesWritable v = PrimitiveObjectInspectorUtils.getBinary(p, inputOI);
-
- // Update max length if new length is greater than the ones seen so far
- int len = v.getLength();
- if (len > myagg.maxLength) {
- myagg.maxLength = len;
- }
-
- // Update sum length with the new length
- myagg.sumLength += len;
- // Increment count of values seen so far
- myagg.count++;
-
- } catch (NumberFormatException e) {
- if (!warned) {
- warned = true;
- LOG.warn(getClass().getSimpleName() + " "
- + StringUtils.stringifyException(e));
- LOG.warn(getClass().getSimpleName()
- + " ignoring similar exceptions.");
+ if (!emptyTable) {
+ // Update null counter if a null value is seen
+ if (p == null) {
+ myagg.countNulls++;
+ }
+ else {
+ try {
+ BytesWritable v = PrimitiveObjectInspectorUtils.getBinary(p, inputOI);
+
+ // Update max length if new length is greater than the ones seen so far
+ int len = v.getLength();
+ if (len > myagg.maxLength) {
+ myagg.maxLength = len;
+ }
+
+ // Update sum length with the new length
+ myagg.sumLength += len;
+
+ // Increment count of values seen so far
+ myagg.count++;
+
+ } catch (NumberFormatException e) {
+ if (!warned) {
+ warned = true;
+ LOG.warn(getClass().getSimpleName() + " "
+ + StringUtils.stringifyException(e));
+ LOG.warn(getClass().getSimpleName()
+ + " ignoring similar exceptions.");
+ }
}
}
}
@@ -1440,7 +1508,12 @@ public class GenericUDAFComputeStats ext
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
BinaryStatsAgg myagg = (BinaryStatsAgg) agg;
- double avgLength = (double)(myagg.sumLength/(1.0 * (myagg.count + myagg.countNulls)));
+ double avgLength = 0.0;
+ long count = myagg.count + myagg.countNulls;
+
+ if (count != 0) {
+ avgLength = (double)(myagg.sumLength / (1.0 * (myagg.count + myagg.countNulls)));
+ }
// Serialize the result struct
((Text) result[0]).set(myagg.columnType);