You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC
svn commit: r1463556 [6/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/
ql/src/gen/thrift/gen-cpp/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Apr 2 14:16:34 2013
@@ -35,6 +35,8 @@ import java.util.regex.PatternSyntaxExce
import org.antlr.runtime.tree.BaseTree;
import org.antlr.runtime.tree.Tree;
+import org.antlr.runtime.tree.TreeWizard;
+import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
@@ -80,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -117,6 +120,24 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionedTableFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -152,6 +173,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -332,7 +357,9 @@ public class SemanticAnalyzer extends Ba
switch (ast.getToken().getType()) {
case HiveParser.TOK_QUERY: {
QB qb = new QB(id, alias, true);
- doPhase1(ast, qb, initPhase1Ctx());
+ Phase1Ctx ctx_1 = initPhase1Ctx();
+ doPhase1(ast, qb, ctx_1);
+
qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
qbexpr.setQB(qb);
}
@@ -358,13 +385,28 @@ public class SemanticAnalyzer extends Ba
}
private LinkedHashMap<String, ASTNode> doPhase1GetAggregationsFromSelect(
- ASTNode selExpr) {
+ ASTNode selExpr, QB qb, String dest) {
+
// Iterate over the selects search for aggregation Trees.
// Use String as keys to eliminate duplicate trees.
LinkedHashMap<String, ASTNode> aggregationTrees = new LinkedHashMap<String, ASTNode>();
for (int i = 0; i < selExpr.getChildCount(); ++i) {
- ASTNode sel = (ASTNode) selExpr.getChild(i).getChild(0);
- doPhase1GetAllAggregations(sel, aggregationTrees);
+ ASTNode sel = (ASTNode) selExpr.getChild(i);
+ doPhase1GetAllAggregations((ASTNode) sel.getChild(0), aggregationTrees);
+ }
+
+ /*
+ * remove any aggregation to be handled by Windowing.
+ */
+ if ( queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null ) {
+ HashMap<String, ASTNode> aliasToWdwExprs = qb.getParseInfo().getWindowingExprsForClause(dest);
+ LinkedHashMap<String, ASTNode> aggTreesMinusWindowing = new LinkedHashMap<String, ASTNode>();
+ for(Map.Entry<String,ASTNode> entry : aggregationTrees.entrySet()) {
+ if ( !aliasToWdwExprs.containsKey(entry.getKey())) {
+ aggTreesMinusWindowing.put(entry.getKey(), entry.getValue());
+ }
+ }
+ aggregationTrees = aggTreesMinusWindowing;
}
return aggregationTrees;
}
@@ -650,6 +692,17 @@ public class SemanticAnalyzer extends Ba
processTable(qb, child);
} else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) {
processSubQuery(qb, child);
+ } else if (child.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) {
+ queryProperties.setHasPTF(true);
+ processPTF(qb, child);
+ PTFInvocationSpec ptfInvocationSpec = qb.getPTFInvocationSpec(child);
+ String inputAlias = ptfInvocationSpec == null ? null :
+ ((PartitionedTableFunctionSpec)ptfInvocationSpec.getFunction()).getAlias();;
+ if ( inputAlias == null ) {
+ throw new SemanticException(generateErrorMessage(child,
+ "PTF invocation in a Join must have an alias"));
+ }
+
} else if (child.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) {
// SELECT * FROM src1 LATERAL VIEW udtf() AS myTable JOIN src2 ...
// is not supported. Instead, the lateral view must be in a subquery
@@ -741,11 +794,14 @@ public class SemanticAnalyzer extends Ba
qbp.setHints((ASTNode) ast.getChild(0));
}
- LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
+ handleWindowingExprsInSelectList(qb, ctx_1.dest, ast);
+
+ LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast,
+ qb, ctx_1.dest);
doPhase1GetColumnAliasesFromSelect(ast, qbp);
qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
qbp.setDistinctFuncExprsForClause(ctx_1.dest,
- doPhase1GetDistinctFuncExprs(aggregations));
+ doPhase1GetDistinctFuncExprs(aggregations));
break;
case HiveParser.TOK_WHERE:
@@ -770,7 +826,6 @@ public class SemanticAnalyzer extends Ba
.getMsg(ast));
}
}
-
qbp.setDestForClause(ctx_1.dest, (ASTNode) ast.getChild(0));
break;
@@ -793,6 +848,9 @@ public class SemanticAnalyzer extends Ba
queryProperties.setHasJoin(true);
processJoin(qb, frm);
qbp.setJoinExpr(frm);
+ }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){
+ queryProperties.setHasPTF(true);
+ processPTF(qb, frm);
}
break;
@@ -819,7 +877,7 @@ public class SemanticAnalyzer extends Ba
break;
case HiveParser.TOK_SORTBY:
- // Get the sort by aliases - these are aliased to the entries in the
+ // Get the sort by aliases - these are aliased to the entries in the
// select list
queryProperties.setHasSortBy(true);
qbp.setSortByExprForClause(ctx_1.dest, ast);
@@ -873,7 +931,16 @@ public class SemanticAnalyzer extends Ba
case HiveParser.TOK_HAVING:
qbp.setHavingExprForClause(ctx_1.dest, ast);
- qbp.addAggregationExprsForClause(ctx_1.dest, doPhase1GetAggregationsFromSelect(ast));
+ qbp.addAggregationExprsForClause(ctx_1.dest,
+ doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest));
+ break;
+
+ case HiveParser.KW_WINDOW:
+ if (!qb.hasWindowingSpec(ctx_1.dest) ) {
+ throw new SemanticException(generateErrorMessage(ast,
+ "Query has no Cluster/Distribute By; but has a Window definition"));
+ }
+ handleQueryWindowClauses(qb, ctx_1, ast);
break;
case HiveParser.TOK_LIMIT:
@@ -2193,12 +2260,20 @@ public class SemanticAnalyzer extends Ba
List<ASTNode> result = new ArrayList<ASTNode>(selectExprs == null ? 0
: selectExprs.getChildCount());
if (selectExprs != null) {
+ HashMap<String, ASTNode> windowingExprs = parseInfo.getWindowingExprsForClause(dest);
+
for (int i = 0; i < selectExprs.getChildCount(); ++i) {
if (((ASTNode) selectExprs.getChild(i)).getToken().getType() == HiveParser.TOK_HINTLIST) {
continue;
}
// table.column AS alias
ASTNode grpbyExpr = (ASTNode) selectExprs.getChild(i).getChild(0);
+ /*
+ * If this is handled by Windowing then ignore it.
+ */
+ if (windowingExprs != null && windowingExprs.containsKey(grpbyExpr.toStringTree())) {
+ continue;
+ }
result.add(grpbyExpr);
}
}
@@ -2225,7 +2300,10 @@ public class SemanticAnalyzer extends Ba
String tabAlias = null;
String[] colRef = new String[2];
- if (selExpr.getChildCount() == 2) {
+ //for queries with a windowing expressions, the selexpr may have a third child
+ if (selExpr.getChildCount() == 2 ||
+ (selExpr.getChildCount() == 3 &&
+ selExpr.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC)) {
// return zz for "xx + yy AS zz"
colAlias = unescapeIdentifier(selExpr.getChild(1).getText());
colRef[0] = tabAlias;
@@ -2302,6 +2380,7 @@ public class SemanticAnalyzer extends Ba
return false;
}
+
private Operator<?> genSelectPlan(String dest, QB qb, Operator<?> input)
throws SemanticException {
ASTNode selExprList = qb.getParseInfo().getSelForClause(dest);
@@ -2439,11 +2518,14 @@ public class SemanticAnalyzer extends Ba
// child can be EXPR AS ALIAS, or EXPR.
ASTNode child = (ASTNode) exprList.getChild(i);
boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
+ boolean isWindowSpec = child.getChildCount() == 3 ?
+ (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
+ false;
// EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
// This check is not needed and invalid when there is a transform b/c the
// AST's are slightly different.
- if (!isInTransform && !isUDTF && child.getChildCount() > 2) {
+ if (!isWindowSpec && !isInTransform && !isUDTF && child.getChildCount() > 2) {
throw new SemanticException(generateErrorMessage(
(ASTNode) child.getChild(2),
ErrorMsg.INVALID_AS.getMsg()));
@@ -6290,12 +6372,18 @@ public class SemanticAnalyzer extends Ba
ASTNode right = (ASTNode) joinParseTree.getChild(1);
if ((left.getToken().getType() == HiveParser.TOK_TABREF)
- || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+ || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)
+ || (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
String tableName = getUnescapedUnqualifiedTableName((ASTNode) left.getChild(0))
.toLowerCase();
String alias = left.getChildCount() == 1 ? tableName
: unescapeIdentifier(left.getChild(left.getChildCount() - 1)
- .getText().toLowerCase());
+ .getText().toLowerCase());
+ // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+ // guranteed to have an lias here: check done in processJoin
+ alias = (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+ unescapeIdentifier(left.getChild(1).getText().toLowerCase()) :
+ alias;
joinTree.setLeftAlias(alias);
String[] leftAliases = new String[1];
leftAliases[0] = alias;
@@ -6321,12 +6409,18 @@ public class SemanticAnalyzer extends Ba
}
if ((right.getToken().getType() == HiveParser.TOK_TABREF)
- || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+ || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)
+ || (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
String tableName = getUnescapedUnqualifiedTableName((ASTNode) right.getChild(0))
.toLowerCase();
String alias = right.getChildCount() == 1 ? tableName
: unescapeIdentifier(right.getChild(right.getChildCount() - 1)
- .getText().toLowerCase());
+ .getText().toLowerCase());
+ // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+ // guranteed to have an lias here: check done in processJoin
+ alias = (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+ unescapeIdentifier(right.getChild(1).getText().toLowerCase()) :
+ alias;
String[] rightAliases = new String[1];
rightAliases[0] = alias;
joinTree.setRightAliases(rightAliases);
@@ -7109,6 +7203,11 @@ public class SemanticAnalyzer extends Ba
curr = genHavingPlan(dest, qb, curr);
}
+
+ if(queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null) {
+ curr = genWindowingPlan(qb.getWindowingSpec(dest), curr);
+ }
+
curr = genSelectPlan(dest, qb, curr);
Integer limit = qbp.getDestLimit(dest);
@@ -7148,6 +7247,7 @@ public class SemanticAnalyzer extends Ba
curr = genReduceSinkPlan(dest, qb, curr, numReducers);
}
+
if (qbp.getIsSubQ()) {
if (limit != null) {
// In case of order by, only 1 reducer is used, so no need of
@@ -7793,11 +7893,38 @@ public class SemanticAnalyzer extends Ba
aliasToOpInfo.put(alias, op);
}
+ Operator srcOpInfo = null;
+ Operator lastPTFOp = null;
+
+ if(queryProperties.hasPTF()){
+ //After processing subqueries and source tables, process
+ // partitioned table functions
+
+ HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
+ if ( ptfNodeToSpec != null ) {
+ for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
+ ASTNode ast = entry.getKey();
+ PTFInvocationSpec spec = entry.getValue();
+ String inputAlias = spec.getQueryInputName();
+ Operator inOp = aliasToOpInfo.get(inputAlias);
+ if ( inOp == null ) {
+ throw new SemanticException(generateErrorMessage(ast,
+ "Cannot resolve input Operator for PTF invocation"));
+ }
+ lastPTFOp = genPTFPlan(spec, inOp);
+ String ptfAlias = ((PartitionedTableFunctionSpec)spec.getFunction()).getAlias();
+ if ( ptfAlias != null ) {
+ aliasToOpInfo.put(ptfAlias, lastPTFOp);
+ }
+ }
+ }
+
+ }
+
// For all the source tables that have a lateral view, attach the
// appropriate operators to the TS
genLateralViewPlans(aliasToOpInfo, qb);
- Operator srcOpInfo = null;
// process join
if (qb.getParseInfo().getJoinExpr() != null) {
@@ -7820,6 +7947,9 @@ public class SemanticAnalyzer extends Ba
// Now if there are more than 1 sources then we have a join case
// later we can extend this to the union all case as well
srcOpInfo = aliasToOpInfo.values().iterator().next();
+ // with ptfs, there maybe more (note for PTFChains:
+ // 1 ptf invocation may entail multiple PTF operators)
+ srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
}
Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo);
@@ -8478,7 +8608,8 @@ public class SemanticAnalyzer extends Ba
}
// continue analyzing from the child ASTNode.
- if (!doPhase1(child, qb, initPhase1Ctx())) {
+ Phase1Ctx ctx_1 = initPhase1Ctx();
+ if (!doPhase1(child, qb, ctx_1)) {
// if phase1Result false return
return;
}
@@ -9672,4 +9803,1040 @@ public class SemanticAnalyzer extends Ba
public void setQB(QB qb) {
this.qb = qb;
}
+
+//--------------------------- PTF handling -----------------------------------
+
+ /*
+ * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+ * PTF invocation.
+ * - For a TABLEREF: set the source to the alias returned by processTable
+ * - For a SubQuery: set the source to the alias returned by processSubQuery
+ * - For a PTF invocation: recursively call processPTFChain.
+ */
+ private PTFInputSpec processPTFSource(QB qb, ASTNode inputNode) throws SemanticException{
+
+ PTFInputSpec qInSpec = null;
+ int type = inputNode.getType();
+ String alias;
+ switch(type)
+ {
+ case HiveParser.TOK_TABREF:
+ alias = processTable(qb, inputNode);
+ qInSpec = new PTFQueryInputSpec();
+ ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.TABLE);
+ ((PTFQueryInputSpec)qInSpec).setSource(alias);
+ break;
+ case HiveParser.TOK_SUBQUERY:
+ alias = processSubQuery(qb, inputNode);
+ qInSpec = new PTFQueryInputSpec();
+ ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.SUBQUERY);
+ ((PTFQueryInputSpec)qInSpec).setSource(alias);
+ break;
+ case HiveParser.TOK_PTBLFUNCTION:
+ qInSpec = processPTFChain(qb, inputNode);
+ break;
+ default:
+ throw new SemanticException(generateErrorMessage(inputNode,
+ "Unknown input type to PTF"));
+ }
+
+ qInSpec.setAstNode(inputNode);
+ return qInSpec;
+
+ }
+
+ /*
+ * - tree form is
+ * ^(TOK_PTBLFUNCTION name alias? partitionTableFunctionSource partitioningSpec? arguments*)
+ * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+ * PTF invocation.
+ */
+ private PartitionedTableFunctionSpec processPTFChain(QB qb, ASTNode ptf)
+ throws SemanticException{
+ int child_count = ptf.getChildCount();
+ if (child_count < 2) {
+ throw new SemanticException(generateErrorMessage(ptf,
+ "Not enough Children " + child_count));
+ }
+
+ PartitionedTableFunctionSpec ptfSpec = new PartitionedTableFunctionSpec();
+ ptfSpec.setAstNode(ptf);
+
+ /*
+ * name
+ */
+ ASTNode nameNode = (ASTNode) ptf.getChild(0);
+ ptfSpec.setName(nameNode.getText());
+
+ int inputIdx = 1;
+
+ /*
+ * alias
+ */
+ ASTNode secondChild = (ASTNode) ptf.getChild(1);
+ if ( secondChild.getType() == HiveParser.Identifier ) {
+ ptfSpec.setAlias(secondChild.getText());
+ inputIdx++;
+ }
+
+ /*
+ * input
+ */
+ ASTNode inputNode = (ASTNode) ptf.getChild(inputIdx);
+ ptfSpec.setInput(processPTFSource(qb, inputNode));
+
+ int argStartIdx = inputIdx + 1;
+
+ /*
+ * partitioning Spec
+ */
+ int pSpecIdx = inputIdx + 1;
+ ASTNode pSpecNode = ptf.getChildCount() > inputIdx ?
+ (ASTNode) ptf.getChild(pSpecIdx) : null;
+ if (pSpecNode != null && pSpecNode.getType() == HiveParser.TOK_PARTITIONINGSPEC)
+ {
+ PartitioningSpec partitioning = processPTFPartitionSpec(pSpecNode);
+ ptfSpec.setPartitioning(partitioning);
+ argStartIdx++;
+ }
+
+ /*
+ * arguments
+ */
+ for(int i=argStartIdx; i < ptf.getChildCount(); i++)
+ {
+ ptfSpec.addArg((ASTNode) ptf.getChild(i));
+ }
+ return ptfSpec;
+ }
+
+ /*
+ * - invoked during FROM AST tree processing, on encountering a PTF invocation.
+ * - tree form is
+ * ^(TOK_PTBLFUNCTION name partitionTableFunctionSource partitioningSpec? arguments*)
+ * - setup a PTFInvocationSpec for this top level PTF invocation.
+ */
+ private void processPTF(QB qb, ASTNode ptf) throws SemanticException{
+
+ PartitionedTableFunctionSpec ptfSpec = processPTFChain(qb, ptf);
+
+ if ( ptfSpec.getAlias() != null ) {
+ qb.addAlias(ptfSpec.getAlias());
+ }
+
+ PTFInvocationSpec spec = new PTFInvocationSpec();
+ spec.setFunction(ptfSpec);
+ qb.addPTFNodeToSpec(ptf, spec);
+ }
+
+//--------------------------- Windowing handling -----------------------------------
+
+ /*
+ * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+ * What makes a UDAF invocation a Windowing Function invocation:
+ * 1. It appears in a SelectExpr that as a WindowSpec
+ * 2. It is a UDAF that implies order (FunctionRegistry.impliesOrder)
+ * 3. It contains lead/lag UDF invocations in its args.
+ */
+ private boolean checkAndExtractWindowFunctionsInSelect(QB qb, ASTNode selectExpr, String dest)
+ throws SemanticException {
+
+ int childCount = selectExpr.getChildCount();
+ ASTNode windowSpec = (ASTNode) selectExpr.getChild(childCount - 1);
+
+ boolean hasWindowSpec = windowSpec.getType() == HiveParser.TOK_WINDOWSPEC;
+
+ ArrayList<ASTNode> functions =
+ extractWindowingUDAFs((ASTNode) selectExpr.getChild(0), !hasWindowSpec);
+ if ( functions.size() == 0 ) {
+ return false;
+ }
+
+ WindowingSpec spec = qb.getWindowingSpec(dest);
+ if(spec == null) {
+ queryProperties.setHasWindowing(true);
+ spec = new WindowingSpec();
+ qb.addDestToWindowingSpec(dest, spec);
+ }
+
+ HashMap<String, ASTNode> wExprsInDest = qb.getParseInfo().getWindowingExprsForClause(dest);
+ int wColIdx = spec.getWindowExpressions() == null ? 0 : spec.getWindowExpressions().size();
+ for(ASTNode function : functions) {
+ WindowFunctionSpec wFnSpec = processWindowFunction(function,
+ hasWindowSpec ? windowSpec : null);
+
+ /*
+ * If this is a duplicate invocation of a function; don't add to WindowingSpec.
+ */
+ if ( wExprsInDest != null &&
+ wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
+ continue;
+ }
+ wFnSpec.setAlias("_wcol" + wColIdx++);
+ spec.addWindowFunction(wFnSpec);
+ qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
+ }
+ return true;
+ }
+
+ /*
+ * return the UDAFs within the expressionTree.
+ * If implyOrder is true, then only return the invocations that:
+ * - are for UDAFs that implyOrder (FunctionRegistry.implyOrder)
+ * - or contain a Lead/Lag UDF invocation in their arguments
+ * If implyOrder is false, then return all UDAF invocations.
+ */
+ private ArrayList<ASTNode> extractWindowingUDAFs(ASTNode expressionTree, boolean implyOrder) {
+ ArrayList<ASTNode> aggregations = new ArrayList<ASTNode>();
+ extractWindowingUDAFs(expressionTree, aggregations);
+ if (!implyOrder) {
+ return aggregations;
+ }
+ ArrayList<ASTNode> wdwUDAFs = new ArrayList<ASTNode>();
+ for(ASTNode function : aggregations) {
+ String fnName = function.getChild(0).getText().toLowerCase();
+ if ( FunctionRegistry.impliesOrder(fnName)) {
+ wdwUDAFs.add(function);
+ continue;
+ }
+ boolean hasLLInArgs = false;
+ for(int i=1; i < function.getChildCount(); i++) {
+ ASTNode child = (ASTNode) function.getChild(i);
+ hasLLInArgs = containsLeadLagUDF(child);
+ if (hasLLInArgs) {
+ break;
+ }
+ }
+ if (hasLLInArgs) {
+ wdwUDAFs.add(function);
+ }
+ }
+ return wdwUDAFs;
+ }
+
+ private void extractWindowingUDAFs(ASTNode expressionTree,
+ ArrayList<ASTNode> aggregations) {
+ int exprTokenType = expressionTree.getToken().getType();
+ if (exprTokenType == HiveParser.TOK_FUNCTION
+ || exprTokenType == HiveParser.TOK_FUNCTIONDI
+ || exprTokenType == HiveParser.TOK_FUNCTIONSTAR) {
+ assert (expressionTree.getChildCount() != 0);
+ if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+ String functionName = unescapeIdentifier(expressionTree.getChild(0)
+ .getText());
+ WindowFunctionInfo fi = FunctionRegistry.getWindowFunctionInfo(functionName);
+ if (fi != null) {
+ aggregations.add(expressionTree);
+ return;
+ }
+ }
+ }
+ for (int i = 0; i < expressionTree.getChildCount(); i++) {
+ extractWindowingUDAFs((ASTNode) expressionTree.getChild(i),
+ aggregations);
+ }
+ }
+
+ private boolean containsLeadLagUDF(ASTNode expressionTree) {
+ int exprTokenType = expressionTree.getToken().getType();
+ if (exprTokenType == HiveParser.TOK_FUNCTION) {
+ assert (expressionTree.getChildCount() != 0);
+ if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+ String functionName = unescapeIdentifier(expressionTree.getChild(0)
+ .getText());
+ functionName = functionName.toLowerCase();
+ if ( FunctionRegistry.LAG_FUNC_NAME.equals(functionName) ||
+ FunctionRegistry.LEAD_FUNC_NAME.equals(functionName)
+ ) {
+ return true;
+ }
+ }
+ }
+ for (int i = 0; i < expressionTree.getChildCount(); i++) {
+ if ( containsLeadLagUDF((ASTNode) expressionTree.getChild(i))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /*
+ * - Invoked during Phase1 when a TOK_SELECT is encountered.
+ * - Select tree form is: ^(TOK_SELECT ^(TOK_SELECTEXPR...) ^(TOK_SELECTEXPR...) ...)
+ * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+ *
+ * See checkAndExtractWindowFunctionsInSelect for rules on what makes a UDAF invocation
+ * a Windowing Function invocation
+ */
+ private void handleWindowingExprsInSelectList(QB qb, String dest, ASTNode selectNode)
+ throws SemanticException {
+ for(int i=0; i < selectNode.getChildCount(); i++)
+ {
+ ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+ if ( selectExpr.getType() != HiveParser.TOK_SELEXPR )
+ {
+ continue;
+ }
+ boolean hasWindowingExprs = checkAndExtractWindowFunctionsInSelect(qb, selectExpr, dest);
+
+ }
+ }
+
+ private void handleQueryWindowClauses(QB qb, Phase1Ctx ctx_1, ASTNode node)
+ throws SemanticException {
+ WindowingSpec spec = qb.getWindowingSpec(ctx_1.dest);
+ for(int i=0; i < node.getChildCount(); i++) {
+ processQueryWindowClause(spec, (ASTNode) node.getChild(i));
+ }
+ }
+
+ private PartitionSpec processPartitionSpec(ASTNode node) {
+ PartitionSpec pSpec = new PartitionSpec();
+ int exprCnt = node.getChildCount();
+ for(int i=0; i < exprCnt; i++) {
+ PartitionExpression exprSpec = new PartitionExpression();
+ exprSpec.setExpression((ASTNode) node.getChild(i));
+ pSpec.addExpression(exprSpec);
+ }
+ return pSpec;
+ }
+
+ private OrderSpec processOrderSpec(ASTNode sortNode) {
+ OrderSpec oSpec = new OrderSpec();
+ int exprCnt = sortNode.getChildCount();
+ for(int i=0; i < exprCnt; i++) {
+ OrderExpression exprSpec = new OrderExpression();
+ exprSpec.setExpression((ASTNode) sortNode.getChild(i).getChild(0));
+ if ( sortNode.getChild(i).getType() == HiveParser.TOK_TABSORTCOLNAMEASC ) {
+ exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.ASC);
+ }
+ else {
+ exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC);
+ }
+ oSpec.addExpression(exprSpec);
+ }
+ return oSpec;
+ }
+
+ private PartitioningSpec processPTFPartitionSpec(ASTNode pSpecNode)
+ {
+ PartitioningSpec partitioning = new PartitioningSpec();
+ ASTNode firstChild = (ASTNode) pSpecNode.getChild(0);
+ int type = firstChild.getType();
+ int exprCnt;
+
+
+ if ( type == HiveParser.TOK_DISTRIBUTEBY || type == HiveParser.TOK_CLUSTERBY )
+ {
+ PartitionSpec pSpec = processPartitionSpec(firstChild);
+ partitioning.setPartSpec(pSpec);
+ ASTNode sortNode = pSpecNode.getChildCount() > 1 ? (ASTNode) pSpecNode.getChild(1) : null;
+ if ( sortNode != null )
+ {
+ OrderSpec oSpec = processOrderSpec(sortNode);
+ partitioning.setOrderSpec(oSpec);
+ }
+ }
+ else if ( type == HiveParser.TOK_SORTBY || type == HiveParser.TOK_ORDERBY ) {
+ ASTNode sortNode = firstChild;
+ OrderSpec oSpec = processOrderSpec(sortNode);
+ partitioning.setOrderSpec(oSpec);
+ }
+ return partitioning;
+ }
+
+ private WindowFunctionSpec processWindowFunction(ASTNode node, ASTNode wsNode)
+ throws SemanticException {
+ WindowFunctionSpec wfSpec = new WindowFunctionSpec();
+
+ switch(node.getType()) {
+ case HiveParser.TOK_FUNCTIONSTAR:
+ wfSpec.setStar(true);
+ break;
+ case HiveParser.TOK_FUNCTIONDI:
+ wfSpec.setDistinct(true);
+ break;
+ }
+
+ if ( wfSpec.isDistinct() ) {
+ throw new SemanticException(generateErrorMessage(node,
+ "Count/Sum distinct not supported with Windowing"));
+ }
+
+ wfSpec.setExpression(node);
+
+ ASTNode nameNode = (ASTNode) node.getChild(0);
+ wfSpec.setName(nameNode.getText());
+
+ for(int i=1; i < node.getChildCount(); i++) {
+ ASTNode child = (ASTNode) node.getChild(i);
+ wfSpec.addArg(child);
+ }
+
+ if ( wsNode != null ) {
+ WindowSpec ws = processWindowSpec(wsNode);
+ wfSpec.setWindowSpec(ws);
+ }
+
+ /*
+ * In order to distinguish between different UDAF invocations on the same UDAF but different Windows
+ * add the WdwSpec node as a child of the Function Node.
+ * It is safe to do this after the function node has been converetd to a WdwFuncSpec.
+ */
+ if ( wsNode != null ) {
+ node.addChild(wsNode);
+ }
+
+ return wfSpec;
+ }
+
+ private void processQueryWindowClause(WindowingSpec spec, ASTNode node)
+ throws SemanticException {
+ ASTNode nameNode = (ASTNode) node.getChild(0);
+ ASTNode wsNode = (ASTNode) node.getChild(1);
+ if(spec.getWindowSpecs() != null && spec.getWindowSpecs().containsKey(nameNode.getText())){
+ throw new SemanticException(generateErrorMessage(nameNode,
+ "Duplicate definition of window " + nameNode.getText() +
+ " is not allowed"));
+ }
+ WindowSpec ws = processWindowSpec(wsNode);
+ spec.addWindowSpec(nameNode.getText(), ws);
+ }
+
+ private WindowSpec processWindowSpec(ASTNode node) throws SemanticException {
+ String sourceId = null;
+ PartitionSpec partition = null;
+ OrderSpec order = null;
+ WindowFrameSpec windowFrame = null;
+
+ boolean hasSrcId = false, hasPartSpec = false, hasWF = false;
+ int srcIdIdx = -1, partIdx = -1, wfIdx = -1;
+
+ for(int i=0; i < node.getChildCount(); i++)
+ {
+ int type = node.getChild(i).getType();
+ switch(type)
+ {
+ case HiveParser.Identifier:
+ hasSrcId = true; srcIdIdx = i;
+ break;
+ case HiveParser.TOK_PARTITIONINGSPEC:
+ hasPartSpec = true; partIdx = i;
+ break;
+ case HiveParser.TOK_WINDOWRANGE:
+ case HiveParser.TOK_WINDOWVALUES:
+ hasWF = true; wfIdx = i;
+ break;
+ }
+ }
+
+ WindowSpec ws = new WindowSpec();
+
+ if (hasSrcId) {
+ ASTNode nameNode = (ASTNode) node.getChild(srcIdIdx);
+ ws.setSourceId(nameNode.getText());
+ }
+
+ if (hasPartSpec) {
+ ASTNode partNode = (ASTNode) node.getChild(partIdx);
+ PartitioningSpec partitioning = processPTFPartitionSpec(partNode);
+ ws.setPartitioning(partitioning);
+ }
+
+ if ( hasWF)
+ {
+ ASTNode wfNode = (ASTNode) node.getChild(wfIdx);
+ WindowFrameSpec wfSpec = processWindowFrame(wfNode);
+ ws.setWindowFrame(wfSpec);
+ }
+
+ return ws;
+ }
+
+ private WindowFrameSpec processWindowFrame(ASTNode node) throws SemanticException {
+ int type = node.getType();
+ BoundarySpec start = null, end = null;
+
+ /*
+ * A WindowFrame may contain just the Start Boundary or in the
+ * between style of expressing a WindowFrame both boundaries
+ * are specified.
+ */
+ start = processBoundary(type, (ASTNode) node.getChild(0));
+ if ( node.getChildCount() > 1 ) {
+ end = processBoundary(type, (ASTNode) node.getChild(1));
+ }
+
+ return new WindowFrameSpec(start, end);
+ }
+
+ private BoundarySpec processBoundary(int frameType, ASTNode node) throws SemanticException {
+ BoundarySpec bs = frameType == HiveParser.TOK_WINDOWRANGE ?
+ new RangeBoundarySpec() : new ValueBoundarySpec();
+ int type = node.getType();
+ boolean hasAmt = true;
+
+ switch(type)
+ {
+ case HiveParser.KW_PRECEDING:
+ bs.setDirection(Direction.PRECEDING);
+ break;
+ case HiveParser.KW_FOLLOWING:
+ bs.setDirection(Direction.FOLLOWING);
+ break;
+ case HiveParser.KW_CURRENT:
+ bs = new CurrentRowSpec();
+ hasAmt = false;
+ break;
+ }
+
+ if ( hasAmt )
+ {
+ ASTNode amtNode = (ASTNode) node.getChild(0);
+ if ( amtNode.getType() == HiveParser.KW_UNBOUNDED)
+ {
+ bs.setAmt(BoundarySpec.UNBOUNDED_AMOUNT);
+ }
+ else
+ {
+ int amt = Integer.parseInt(amtNode.getText());
+ if ( amt < 0 ) {
+ throw new SemanticException(
+ "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt);
+ }
+ bs.setAmt(amt);
+ }
+ }
+
+ return bs;
+ }
+
+ /*
+ * check if a Select Expr is a constant.
+ * - current logic used is to look for HiveParser.TOK_TABLE_OR_COL
+ * - if there is none then the expression is a constant.
+ */
+ private static class ConstantExprCheck implements ContextVisitor {
+ boolean isConstant = true;
+
+ public void visit(Object t, Object parent, int childIndex, Map labels) {
+ if ( !isConstant ) {
+ return;
+ }
+ ASTNode node = (ASTNode) t;
+ if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL ) {
+ isConstant = false;
+ }
+ }
+
+ public void reset() {
+ isConstant = true;
+ }
+
+ protected boolean isConstant() {
+ return isConstant;
+ }
+ }
+
+ private static class AggregationExprCheck implements ContextVisitor {
+ HashMap<String, ASTNode> destAggrExprs;
+ boolean isAggr = false;
+
+ public AggregationExprCheck(HashMap<String, ASTNode> destAggrExprs) {
+ super();
+ this.destAggrExprs = destAggrExprs;
+ }
+
+ public void visit(Object t, Object parent, int childIndex, Map labels) {
+ if ( isAggr ) {
+ return;
+ }
+ if ( destAggrExprs.values().contains(t)) {
+ isAggr = true;
+ }
+ }
+
+ public void reset() {
+ isAggr = false;
+ }
+
+ protected boolean isAggr() {
+ return isAggr;
+ }
+ }
+
+ /*
+ * Returns false if there is a SelectExpr that is not a constant or an aggr.
+ *
+ */
+ private boolean isValidGroupBySelectList(QB currQB, String clause){
+ ConstantExprCheck constantExprCheck = new ConstantExprCheck();
+ AggregationExprCheck aggrExprCheck = new AggregationExprCheck(
+ currQB.getParseInfo().getAggregationExprsForClause(clause));
+
+ TreeWizard tw = new TreeWizard(ParseDriver.adaptor, HiveParser.tokenNames);
+ ASTNode selectNode = currQB.getParseInfo().getSelForClause(clause);
+
+ /*
+ * for Select Distinct Queries we don't move any aggregations.
+ */
+ if ( selectNode != null && selectNode.getType() == HiveParser.TOK_SELECTDI ) {
+ return true;
+ }
+
+ for (int i = 0; selectNode != null && i < selectNode.getChildCount(); i++) {
+ ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+ //check for TOK_HINTLIST expressions on ast
+ if(selectExpr.getType() != HiveParser.TOK_SELEXPR){
+ continue;
+ }
+
+ constantExprCheck.reset();
+ PTFTranslator.visit(selectExpr.getChild(0), constantExprCheck);
+
+ if ( !constantExprCheck.isConstant() ) {
+ aggrExprCheck.reset();
+ PTFTranslator.visit(selectExpr.getChild(0), aggrExprCheck);
+ if (!aggrExprCheck.isAggr() ) {
+ return false;
+ }
+ }
+
+ }
+ return true;
+ }
+
+//--------------------------- PTF handling: PTFInvocationSpec to PTFDesc --------------------------
+
+ private PTFDesc translatePTFInvocationSpec(PTFInvocationSpec ptfQSpec, RowResolver inputRR)
+ throws SemanticException{
+ PTFDesc ptfDesc = null;
+ PTFTranslator translator = new PTFTranslator();
+ ptfDesc = translator.translate(ptfQSpec, this, conf, inputRR, unparseTranslator);
+ return ptfDesc;
+ }
+
+ Operator genPTFPlan(PTFInvocationSpec ptfQSpec, Operator input) throws SemanticException {
+ ArrayList<PTFInvocationSpec> componentQueries = PTFTranslator.componentize(ptfQSpec);
+ for (PTFInvocationSpec ptfSpec : componentQueries) {
+ input = genPTFPlanForComponentQuery(ptfSpec, input);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created PTF Plan ");
+ }
+ return input;
+ }
+
+
+ /**
+ * Construct the data structures containing ExprNodeDesc for partition
+ * columns and order columns. Use the input definition to construct the list
+ * of output columns for the ReduceSinkOperator
+ *
+ * @throws SemanticException
+ */
+ void buildPTFReduceSinkDetails(PartitionedTableFunctionDef tabDef,
+ RowResolver inputRR,
+ ArrayList<ExprNodeDesc> partCols,
+ ArrayList<ExprNodeDesc> valueCols,
+ ArrayList<ExprNodeDesc> orderCols,
+ Map<String, ExprNodeDesc> colExprMap,
+ List<String> outputColumnNames,
+ StringBuilder orderString,
+ RowResolver extractRR) throws SemanticException {
+
+ ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
+
+ for (PTFExpressionDef colDef : partColList) {
+ partCols.add(colDef.getExprNode());
+ orderCols.add(colDef.getExprNode());
+ orderString.append('+');
+ }
+
+ /*
+ * Order columns are used as key columns for constructing
+ * the ReduceSinkOperator
+ * Since we do not explicitly add these to outputColumnNames,
+ * we need to set includeKeyCols = false while creating the
+ * ReduceSinkDesc
+ */
+ ArrayList<OrderExpressionDef> orderColList = tabDef.getOrder().getExpressions();
+ for (int i = 0; i < orderColList.size(); i++) {
+ OrderExpressionDef colDef = orderColList.get(i);
+ org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = colDef.getOrder();
+ if (order.name().equals("ASC")) {
+ orderString.append('+');
+ } else {
+ orderString.append('-');
+ }
+ orderCols.add(colDef.getExprNode());
+ }
+
+ /*
+ * We add the column to value columns or output column names
+ * only if it is not a virtual column
+ */
+ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+ LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+ new LinkedHashMap<String[], ColumnInfo>();
+ int pos = 0;
+ for (ColumnInfo colInfo : colInfoList) {
+ if (!colInfo.isHiddenVirtualCol()) {
+ ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsVirtualCol());
+ valueCols.add(valueColExpr);
+ colExprMap.put(colInfo.getInternalName(), valueColExpr);
+ String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+ outputColumnNames.add(outColName);
+
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ /*
+ * if we have already encountered this colInfo internalName.
+ * We encounter it again because it must be put for the Having clause.
+ * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+ */
+ if ( colsAddedByHaving.containsKey(alias)) {
+ continue;
+ }
+ ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+ ColumnInfo eColInfo = new ColumnInfo(
+ outColName, colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+ if ( astNode == null ) {
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ else {
+ /*
+ * in case having clause refers to this column may have been added twice;
+ * once with the ASTNode.toStringTree as the alias
+ * and then with the real alias.
+ */
+ extractRR.putExpression(astNode, eColInfo);
+ if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+ colsAddedByHaving.put(alias, eColInfo);
+ }
+ }
+ }
+ }
+
+ for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+ String[] alias = columnAddedByHaving.getKey();
+ ColumnInfo eColInfo = columnAddedByHaving.getValue();
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ }
+
+ private Operator genPTFPlanForComponentQuery(PTFInvocationSpec ptfQSpec, Operator input)
+ throws SemanticException {
+ /*
+ * 1. Create the PTFDesc from the Qspec attached to this QB.
+ */
+ RowResolver rr = opParseCtx.get(input).getRowResolver();
+ PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+ /*
+ * Build an RR for the Extract Op from the ResuceSink Op's RR.
+ * Why?
+ * We need to remove the Virtual Columns present in the RS's RR. The OI
+ * that gets passed to Extract at runtime doesn't contain the Virtual Columns.
+ * So internal names get changed. Consider testCase testJoinWithLeadLag,
+ * which is a self join on part and also has a Windowing expression.
+ * The RR of the RS op at transaltion time looks something like this:
+ * (_co1,_col2,..,_col7, _col8(vc=true),_col9(vc=true),
+ * _col10,_col11,.._col15(vc=true),_col16(vc=true),..)
+ * At runtime the Virtual columns are removed and all the columns after _col7
+ * are shifted 1 or 2 positions.
+ * So in child Operators ColumnExprNodeDesc's are no longer referring to the right columns.
+ *
+ * So we build a new RR for the Extract Op, with the Virtual Columns removed.
+ * We hand this to the PTFTranslator as the
+ * starting RR to use to translate a PTF Chain.
+ */
+ RowResolver extractOpRR = new RowResolver();
+
+ /*
+ * 2. build Map-side Op Graph. Graph template is either:
+ * Input -> PTF_map -> ReduceSink
+ * or
+ * Input -> ReduceSink
+ *
+ * Here the ExprNodeDescriptors in the QueryDef are based on the Input Operator's RR.
+ */
+ {
+ PartitionedTableFunctionDef tabDef = ptfDesc.getStartOfChain();
+
+ /*
+ * a. add Map-side PTF Operator if needed
+ */
+ if (tabDef.isTransformsRawInput() )
+ {
+ RowResolver ptfMapRR = tabDef.getRawInputShape().getRr();
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfMapRR.getColumnInfos()),
+ input), ptfMapRR);
+ rr = opParseCtx.get(input).getRowResolver();
+ }
+
+ /*
+ * b. Build Reduce Sink Details (keyCols, valueCols, outColNames etc.) for this ptfDesc.
+ */
+
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ List<String> outputColumnNames = new ArrayList<String>();
+ StringBuilder orderString = new StringBuilder();
+
+ /*
+ * Use the input RR of TableScanOperator in case there is no map-side
+ * reshape of input.
+ * If the parent of ReduceSinkOperator is PTFOperator, use it's
+ * output RR.
+ */
+ buildPTFReduceSinkDetails(tabDef,
+ rr,
+ partCols,
+ valueCols,
+ orderCols,
+ colExprMap,
+ outputColumnNames,
+ orderString,
+ extractOpRR);
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+ .getReduceSinkDesc(orderCols,
+ valueCols, outputColumnNames, false,
+ -1, partCols, orderString.toString(), -1),
+ new RowSchema(rr.getColumnInfos()), input), rr);
+ input.setColumnExprMap(colExprMap);
+ }
+
+ /*
+ * 3. build Reduce-side Op Graph
+ */
+ {
+ /*
+ * b. Construct Extract Operator.
+ */
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new ExtractDesc(
+ new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE
+ .toString(), "", false)),
+ new RowSchema(extractOpRR.getColumnInfos()),
+ input), extractOpRR);
+
+ /*
+ * c. Rebuilt the QueryDef.
+ * Why?
+ * - so that the ExprNodeDescriptors in the QueryDef are based on the
+ * Extract Operator's RowResolver
+ */
+ rr = opParseCtx.get(input).getRowResolver();
+ ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+ /*
+ * d. Construct PTF Operator.
+ */
+ RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfOpRR.getColumnInfos()),
+ input), ptfOpRR);
+
+ }
+
+ return input;
+
+ }
+
+//--------------------------- Windowing handling: PTFInvocationSpec to PTFDesc --------------------
+
+ Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException {
+ wSpec.validateAndMakeEffective();
+ WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+ RowResolver rr = opParseCtx.get(input).getRowResolver();
+
+ while(groups.hasNext() ) {
+ wSpec = groups.next(conf, this, unparseTranslator, rr);
+ input = genReduceSinkPlanForWindowing(wSpec, rr, input);
+ rr = opParseCtx.get(input).getRowResolver();
+ PTFTranslator translator = new PTFTranslator();
+ PTFDesc ptfDesc = translator.translate(wSpec, this, conf, rr, unparseTranslator);
+ RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+ new RowSchema(ptfOpRR.getColumnInfos()),
+ input), ptfOpRR);
+ rr = ptfOpRR;
+ }
+
+ return input;
+ }
+
+ private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
+ RowResolver inputRR,
+ Operator input) throws SemanticException{
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+ List<String> outputColumnNames = new ArrayList<String>();
+ StringBuilder orderString = new StringBuilder();
+
+ ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
+ for (PartitionExpression partCol : partColList) {
+ ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
+ partCols.add(partExpr);
+ orderCols.add(partExpr);
+ orderString.append('+');
+ }
+
+ ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
+ new ArrayList<PTFInvocationSpec.OrderExpression>() :
+ spec.getQueryOrderSpec().getExpressions();
+ for (int i = 0; i < orderColList.size(); i++) {
+ OrderExpression orderCol = orderColList.get(i);
+ org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
+ if (order.name().equals("ASC")) {
+ orderString.append('+');
+ } else {
+ orderString.append('-');
+ }
+ ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
+ orderCols.add(orderExpr);
+ }
+
+ ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+ RowResolver rsNewRR = new RowResolver();
+ int pos = 0;
+ for (ColumnInfo colInfo : colInfoList) {
+ ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+ .getInternalName(), colInfo.getTabAlias(), colInfo
+ .getIsVirtualCol());
+ valueCols.add(valueColExpr);
+ colExprMap.put(colInfo.getInternalName(), valueColExpr);
+ String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+ outputColumnNames.add(outColName);
+
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ ColumnInfo newColInfo = new ColumnInfo(
+ outColName, colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+ rsNewRR.put(alias[0], alias[1], newColInfo);
+
+ }
+
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+ .getReduceSinkDesc(orderCols,
+ valueCols, outputColumnNames, false,
+ -1, partCols, orderString.toString(), -1),
+ new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+ input.setColumnExprMap(colExprMap);
+
+
+ // Construct the RR for extract operator
+ RowResolver extractRR = new RowResolver();
+ LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+ new LinkedHashMap<String[], ColumnInfo>();
+ pos = 0;
+
+ for (ColumnInfo colInfo : colInfoList) {
+ String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+ /*
+ * if we have already encountered this colInfo internalName.
+ * We encounter it again because it must be put for the Having clause.
+ * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+ */
+ if ( colsAddedByHaving.containsKey(alias)) {
+ continue;
+ }
+ ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+ ColumnInfo eColInfo = new ColumnInfo(
+ SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+ colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+ if ( astNode == null ) {
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+ else {
+ /*
+ * in case having clause refers to this column may have been added twice;
+ * once with the ASTNode.toStringTree as the alias
+ * and then with the real alias.
+ */
+ extractRR.putExpression(astNode, eColInfo);
+ if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+ colsAddedByHaving.put(alias, eColInfo);
+ }
+ }
+ }
+
+ for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+ String[] alias = columnAddedByHaving.getKey();
+ ColumnInfo eColInfo = columnAddedByHaving.getValue();
+ extractRR.put(alias[0], alias[1], eColInfo);
+ }
+
+ /*
+ * b. Construct Extract Operator.
+ */
+ input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ new ExtractDesc(
+ new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+ Utilities.ReduceField.VALUE
+ .toString(), "", false)),
+ new RowSchema(inputRR.getColumnInfos()),
+ input), extractRR);
+
+
+ return input;
+ }
+
+
+ public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
+ throws SemanticException
+ {
+ ASTNode selNode = null;
+ try {
+ ParseDriver pd = new ParseDriver();
+ selNode = pd.parseSelect(selectExprStr, null);
+ } catch (ParseException pe) {
+ throw new SemanticException(pe);
+ }
+
+ ArrayList<WindowExpressionSpec> selSpec = new ArrayList<WindowExpressionSpec>();
+ int childCount = selNode.getChildCount();
+ for (int i = 0; i < childCount; i++) {
+ ASTNode selExpr = (ASTNode) selNode.getChild(i);
+ if (selExpr.getType() != HiveParser.TOK_SELEXPR) {
+ throw new SemanticException(String.format(
+ "Only Select expressions supported in dynamic select list: %s", selectExprStr));
+ }
+ ASTNode expr = (ASTNode) selExpr.getChild(0);
+ if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
+ throw new SemanticException(
+ String.format("'%s' column not allowed in dynamic select list", selectExprStr));
+ }
+ ASTNode aliasNode = selExpr.getChildCount() > 1
+ && selExpr.getChild(1).getType() == HiveParser.Identifier ?
+ (ASTNode) selExpr.getChild(1) : null;
+ String alias = null;
+ if ( aliasNode != null ) {
+ alias = aliasNode.getText();
+ }
+ else {
+ String[] tabColAlias = getColAlias(selExpr, null, null, true, -1);
+ alias = tabColAlias[1];
+ }
+ WindowExpressionSpec exprSpec = new WindowExpressionSpec();
+ exprSpec.setAlias(alias);
+ exprSpec.setExpression(expr);
+ selSpec.add(exprSpec);
+ }
+
+ return selSpec;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java Tue Apr 2 14:16:34 2013
@@ -492,6 +492,7 @@ public final class TypeCheckProcFactory
static HashMap<Integer, String> specialUnaryOperatorTextHashMap;
static HashMap<Integer, String> specialFunctionTextHashMap;
static HashMap<Integer, String> conversionFunctionTextHashMap;
+ static HashSet<Integer> windowingTokens;
static {
specialUnaryOperatorTextHashMap = new HashMap<Integer, String>();
specialUnaryOperatorTextHashMap.put(HiveParser.PLUS, "positive");
@@ -522,6 +523,22 @@ public final class TypeCheckProcFactory
serdeConstants.TIMESTAMP_TYPE_NAME);
conversionFunctionTextHashMap.put(HiveParser.TOK_DECIMAL,
serdeConstants.DECIMAL_TYPE_NAME);
+
+ windowingTokens = new HashSet<Integer>();
+ windowingTokens.add(HiveParser.KW_OVER);
+ windowingTokens.add(HiveParser.TOK_PARTITIONINGSPEC);
+ windowingTokens.add(HiveParser.TOK_DISTRIBUTEBY);
+ windowingTokens.add(HiveParser.TOK_SORTBY);
+ windowingTokens.add(HiveParser.TOK_CLUSTERBY);
+ windowingTokens.add(HiveParser.TOK_WINDOWSPEC);
+ windowingTokens.add(HiveParser.TOK_WINDOWRANGE);
+ windowingTokens.add(HiveParser.TOK_WINDOWVALUES);
+ windowingTokens.add(HiveParser.KW_UNBOUNDED);
+ windowingTokens.add(HiveParser.KW_PRECEDING);
+ windowingTokens.add(HiveParser.KW_FOLLOWING);
+ windowingTokens.add(HiveParser.KW_CURRENT);
+ windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEASC);
+ windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEDESC);
}
private static boolean isRedundantConversionFunction(ASTNode expr,
@@ -865,6 +882,20 @@ public final class TypeCheckProcFactory
ASTNode expr = (ASTNode) nd;
+ /*
+ * A Windowing specification get added as a child to a UDAF invocation to distinguish it
+ * from similar UDAFs but on different windows.
+ * The UDAF is translated to a WindowFunction invocation in the PTFTranslator.
+ * So here we just return null for tokens that appear in a Window Specification.
+ * When the traversal reaches up to the UDAF invocation its ExprNodeDesc is build using the
+ * ColumnInfo in the InputRR. This is similar to how UDAFs are handled in Select lists.
+ * The difference is that there is translation for Window related tokens, so we just
+ * return null;
+ */
+ if ( windowingTokens.contains(expr.getType())) {
+ return null;
+ }
+
if (expr.getType() == HiveParser.TOK_TABNAME) {
return null;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+
+/*
+ * breakup the original WindowingSpec into a set of WindowingSpecs.
+ * Each WindowingSpec is executed in an instance of PTFOperator,
+ * preceded by ReduceSink and Extract.
+ * The logic to componentize is straightforward:
+ * - distribute Window Fn. Specs from original Window Spec into a set of WindowSpecs,
+ * based on their Partitioning.
+ * - A Group of WindowSpecs, is a subset of the Window Fn Invocations in the QueryBlock that
+ * have the same Partitioning(Partition + Order spec).
+ * - Each Group is put in a new WindowingSpec and is evaluated in its own PTFOperator instance.
+ * - the order of computation is then inferred based on the dependency between Groups.
+ * If 2 groups have the same dependency, then the Group with the function that is
+ * earliest in the SelectList is executed first.
+ */
+public class WindowingComponentizer {
+
+ WindowingSpec originalSpec;
+ LinkedHashMap<PartitioningSpec, WindowingSpec> groups;
+
+ public WindowingComponentizer(WindowingSpec originalSpec) throws SemanticException {
+ super();
+ this.originalSpec = originalSpec;
+ groups = new LinkedHashMap<PartitioningSpec, WindowingSpec>();
+ groupFunctions();
+ }
+
+ private void groupFunctions() throws SemanticException {
+ for (WindowExpressionSpec expr : originalSpec.getWindowExpressions()) {
+ WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
+ PartitioningSpec wFnGrp = wFn.getWindowSpec().getPartitioning();
+ WindowingSpec wSpec = groups.get(wFnGrp);
+ if (wSpec == null) {
+ wSpec = new WindowingSpec();
+ groups.put(wFnGrp, wSpec);
+ }
+ wSpec.addWindowFunction(wFn);
+ }
+ }
+
+ public boolean hasNext() {
+ return !groups.isEmpty();
+ }
+
+ public WindowingSpec next(HiveConf hCfg,
+ SemanticAnalyzer semAly,
+ UnparseTranslator unparseT,
+ RowResolver inputRR) throws SemanticException {
+
+ SemanticException originalException = null;
+
+ Iterator<Map.Entry<PartitioningSpec, WindowingSpec>> grpIt = groups.entrySet().iterator();
+ while (grpIt.hasNext()) {
+ Map.Entry<PartitioningSpec, WindowingSpec> entry = grpIt.next();
+ WindowingSpec wSpec = entry.getValue();
+ try {
+ PTFTranslator t = new PTFTranslator();
+ t.translate(wSpec, semAly, hCfg, inputRR, unparseT);
+ groups.remove(entry.getKey());
+ return wSpec;
+ } catch (SemanticException se) {
+ originalException = se;
+ }
+ }
+
+ throw new SemanticException("Failed to breakup Windowing invocations into Groups. " +
+ "At least 1 group must only depend on input columns. " +
+ "Also check for circular dependencies.\n" +
+ "Underlying error: " + originalException.getMessage());
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator.LeadLagInfo;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead;
+
+/*
+ * When constructing the Evaluator Tree from an ExprNode Tree
+ * - look for any descendant LeadLag Function Expressions
+ * - if they are found:
+ * - add them to the LLInfo.leadLagExprs and
+ * - add a mapping from the Expr Tree root to the LLFunc Expr in LLInfo.mapTopExprToLLFunExprs
+ */
+public class WindowingExprNodeEvaluatorFactory {
+
+ public static ExprNodeEvaluator get(LeadLagInfo llInfo, ExprNodeDesc desc) throws HiveException
+ {
+ FindLeadLagFuncExprs visitor = new FindLeadLagFuncExprs(llInfo, desc);
+ new ExprNodeWalker(visitor).walk(desc);
+ return ExprNodeEvaluatorFactory.get(desc);
+ }
+
+ public static class FindLeadLagFuncExprs
+ {
+ ExprNodeDesc topExpr;
+ LeadLagInfo llInfo;
+
+ FindLeadLagFuncExprs(LeadLagInfo llInfo, ExprNodeDesc topExpr)
+ {
+ this.llInfo = llInfo;
+ this.topExpr = topExpr;
+ }
+
+ public void visit(ExprNodeGenericFuncDesc fnExpr) throws HiveException
+ {
+ GenericUDF fn = fnExpr.getGenericUDF();
+ if (fn instanceof GenericUDFLead || fn instanceof GenericUDFLag )
+ {
+ llInfo.addLLFuncExprForTopExpr(topExpr, fnExpr);
+ }
+ }
+ }
+
+ static class ExprNodeWalker
+ {
+ FindLeadLagFuncExprs visitor;
+
+ public ExprNodeWalker(FindLeadLagFuncExprs visitor)
+ {
+ super();
+ this.visitor = visitor;
+ }
+
+ public void walk(ExprNodeDesc e) throws HiveException
+ {
+ if ( e == null ) {
+ return;
+ }
+ List<ExprNodeDesc> children = e.getChildren();
+ if ( children != null )
+ {
+ for(ExprNodeDesc child : children)
+ {
+ walk(child);
+ }
+ }
+
+ if ( e instanceof ExprNodeGenericFuncDesc)
+ {
+ visitor.visit((ExprNodeGenericFuncDesc)e);
+ }
+ }
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Tue Apr 2 14:16:34 2013
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.antlr.runtime.CommonToken;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+
+/*
+ * Captures the Window processing specified in a Query. A Query may
+ * contain:
+ * - UDAF invocations on a Window.
+ * - Lead/Lag function invocations that can only be evaluated in a
+ * Partition.
+ * - For Queries that don't have a Group By all UDAF invocations are
+ * treated as Window Function invocations.
+ * - For Queries that don't have a Group By, the Having condition is
+ * handled as a post processing on the rows output by Windowing
+ * processing.
+ * Windowing is a container of all the Select Expressions that are
+ * to be handled by Windowing. These are held in 2 lists: the functions
+ * list holds WindowFunction invocations; the expressions list holds
+ * Select Expressions having Lead/Lag function calls. It may also
+ * contain an ASTNode representing the post filter to apply on the
+ * output of Window Functions.
+ * Windowing also contains all the Windows defined in the Query. One of
+ * the Windows is designated as the 'default' Window. If the Query has a
+ * Distribute By/Cluster By clause; then the information in these
+ * clauses is captured as a Partitioning and used as the default Window
+ * for the Query. Otherwise the first Window specified is treated as the
+ * default.
+ * Finally Windowing maintains a Map from an 'alias' to the ASTNode that
+ * represents the Select Expression that was translated to a Window
+ * Function invocation or a Window Expression. This is used when
+ * building RowResolvers.
+ */
+public class WindowingSpec {
+ HashMap<String, WindowExpressionSpec> aliasToWdwExpr;
+ HashMap<String, WindowSpec> windowSpecs;
+ ArrayList<WindowExpressionSpec> windowExpressions;
+
+ public void addWindowSpec(String name, WindowSpec wdwSpec) {
+ windowSpecs = windowSpecs == null ? new HashMap<String, WindowSpec>() : windowSpecs;
+ windowSpecs.put(name, wdwSpec);
+ }
+
+ public void addExpression(ASTNode expr, String alias) {
+ windowExpressions = windowExpressions == null ?
+ new ArrayList<WindowExpressionSpec>() : windowExpressions;
+ aliasToWdwExpr = aliasToWdwExpr == null ?
+ new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
+ WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
+ wExprSpec.setAlias(alias);
+ wExprSpec.setExpression(expr);
+
+ windowExpressions.add(wExprSpec);
+ aliasToWdwExpr.put(alias, wExprSpec);
+ }
+
+ public void addWindowFunction(WindowFunctionSpec wFn) {
+ windowExpressions = windowExpressions == null ?
+ new ArrayList<WindowExpressionSpec>() : windowExpressions;
+ aliasToWdwExpr = aliasToWdwExpr == null ?
+ new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
+ windowExpressions.add(wFn);
+ aliasToWdwExpr.put(wFn.getAlias(), wFn);
+ }
+
+ public HashMap<String, WindowExpressionSpec> getAliasToWdwExpr() {
+ return aliasToWdwExpr;
+ }
+
+ public void setAliasToWdwExpr(HashMap<String, WindowExpressionSpec> aliasToWdwExpr) {
+ this.aliasToWdwExpr = aliasToWdwExpr;
+ }
+
+ public HashMap<String, WindowSpec> getWindowSpecs() {
+ return windowSpecs;
+ }
+
+ public void setWindowSpecs(HashMap<String, WindowSpec> windowSpecs) {
+ this.windowSpecs = windowSpecs;
+ }
+
+ public ArrayList<WindowExpressionSpec> getWindowExpressions() {
+ return windowExpressions;
+ }
+
+ public void setWindowExpressions(ArrayList<WindowExpressionSpec> windowExpressions) {
+ this.windowExpressions = windowExpressions;
+ }
+
+ public PartitioningSpec getQueryPartitioningSpec() {
+ /*
+ * Why no null and class checks?
+ * With the new design a WindowingSpec must contain a WindowFunctionSpec.
+ * todo: cleanup datastructs.
+ */
+ WindowFunctionSpec wFn = (WindowFunctionSpec) getWindowExpressions().get(0);
+ return wFn.getWindowSpec().getPartitioning();
+ }
+
+ public PartitionSpec getQueryPartitionSpec() {
+ return getQueryPartitioningSpec().getPartSpec();
+ }
+
+ public OrderSpec getQueryOrderSpec() {
+ return getQueryPartitioningSpec().getOrderSpec();
+ }
+
+ /*
+ * Apply the rules in the Spec. to fill in any missing pieces of every Window Specification,
+ * also validate that the effective Specification is valid. The rules applied are:
+ * - For Wdw Specs that refer to Window Defns, inherit missing components.
+ * - A Window Spec with no Parition Spec, is Partitioned on a Constant(number 0)
+ * - For missing Wdw Frames or for Frames with only a Start Boundary, completely specify them
+ * by the rules in {@link effectiveWindowFrame}
+ * - Validate the effective Window Frames with the rules in {@link validateWindowFrame}
+ * - If there is no Order, then add the Partition expressions as the Order.
+ */
+ public void validateAndMakeEffective() throws SemanticException {
+ for(WindowExpressionSpec expr : getWindowExpressions()) {
+ WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
+ WindowSpec wdwSpec = wFn.getWindowSpec();
+
+ // 1. For Wdw Specs that refer to Window Defns, inherit missing components
+ if ( wdwSpec != null ) {
+ ArrayList<String> sources = new ArrayList<String>();
+ fillInWindowSpec(wdwSpec.getSourceId(), wdwSpec, sources);
+ }
+
+ if ( wdwSpec == null ) {
+ wdwSpec = new WindowSpec();
+ wFn.setWindowSpec(wdwSpec);
+ }
+
+ // 2. A Window Spec with no Parition Spec, is Partitioned on a Constant(number 0)
+ applyContantPartition(wdwSpec);
+
+ // 3. For missing Wdw Frames or for Frames with only a Start Boundary, completely
+ // specify them by the rules in {@link effectiveWindowFrame}
+ effectiveWindowFrame(wFn, wdwSpec);
+
+ // 4. Validate the effective Window Frames with the rules in {@link validateWindowFrame}
+ validateWindowFrame(wdwSpec);
+
+ // 5. If there is no Order, then add the Partition expressions as the Order.
+ wdwSpec.ensureOrderSpec();
+ }
+ }
+
+ private void fillInWindowSpec(String sourceId, WindowSpec dest, ArrayList<String> visited)
+ throws SemanticException
+ {
+ if (sourceId != null)
+ {
+ if ( visited.contains(sourceId)) {
+ visited.add(sourceId);
+ throw new SemanticException(String.format("Cycle in Window references %s", visited));
+ }
+ WindowSpec source = getWindowSpecs().get(sourceId);
+ if (source == null || source.equals(dest))
+ {
+ throw new SemanticException(String.format("Window Spec %s refers to an unknown source " ,
+ dest));
+ }
+
+ if (dest.getPartition() == null)
+ {
+ dest.setPartition(source.getPartition());
+ }
+
+ if (dest.getOrder() == null)
+ {
+ dest.setOrder(source.getOrder());
+ }
+
+ if (dest.getWindowFrame() == null)
+ {
+ dest.setWindowFrame(source.getWindowFrame());
+ }
+
+ visited.add(sourceId);
+
+ fillInWindowSpec(source.getSourceId(), dest, visited);
+ }
+ }
+
+ private void applyContantPartition(WindowSpec wdwSpec) {
+ PartitionSpec partSpec = wdwSpec.getPartition();
+ if ( partSpec == null ) {
+ partSpec = new PartitionSpec();
+ PartitionExpression partExpr = new PartitionExpression();
+ partExpr.setExpression(new ASTNode(new CommonToken(HiveParser.Number, "0")));
+ partSpec.addExpression(partExpr);
+ wdwSpec.setPartition(partSpec);
+ }
+ }
+
+ /*
+ * - A Window Frame that has only the /start/boundary, then it is interpreted as:
+ BETWEEN <start boundary> AND CURRENT ROW
+ * - A Window Specification with an Order Specification and no Window
+ * Frame is interpreted as:
+ ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ * - A Window Specification with no Order and no Window Frame is interpreted as:
+ ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ */
+ private void effectiveWindowFrame(WindowFunctionSpec wFn, WindowSpec wdwSpec) {
+
+ WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn.getName());
+ boolean supportsWindowing = wFnInfo == null ? true : wFnInfo.isSupportsWindow();
+ WindowFrameSpec wFrame = wdwSpec.getWindowFrame();
+ OrderSpec orderSpec = wdwSpec.getOrder();
+ if ( wFrame == null ) {
+ if (!supportsWindowing ) {
+ wFrame = new WindowFrameSpec(
+ new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+ new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+ );
+ }
+ else if ( orderSpec == null ) {
+ wFrame = new WindowFrameSpec(
+ new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+ new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+ );
+ }
+ else {
+ wFrame = new WindowFrameSpec(
+ new ValueBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+ new CurrentRowSpec()
+ );
+ }
+ wdwSpec.setWindowFrame(wFrame);
+ }
+ else if ( wFrame.getEnd() == null ) {
+ wFrame.setEnd(new CurrentRowSpec());
+ }
+ }
+
+ private void validateWindowFrame(WindowSpec wdwSpec) throws SemanticException {
+ WindowFrameSpec wFrame = wdwSpec.getWindowFrame();
+ BoundarySpec start = wFrame.getStart();
+ BoundarySpec end = wFrame.getEnd();
+
+ if ( start.getDirection() == Direction.FOLLOWING &&
+ start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) {
+ throw new SemanticException("Start of a WindowFrame cannot be UNBOUNDED FOLLOWING");
+ }
+
+ if ( end.getDirection() == Direction.PRECEDING &&
+ start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) {
+ throw new SemanticException("End of a WindowFrame cannot be UNBOUNDED PRECEDING");
+ }
+
+ validateValueBoundary(wFrame.getStart(), wdwSpec.getOrder());
+ validateValueBoundary(wFrame.getEnd(), wdwSpec.getOrder());
+ }
+
+ private void validateValueBoundary(BoundarySpec bs, OrderSpec order) throws SemanticException {
+ if ( bs instanceof ValueBoundarySpec ) {
+ ValueBoundarySpec vbs = (ValueBoundarySpec) bs;
+ if ( order != null ) {
+ if ( order.getExpressions().size() > 1 ) {
+ throw new SemanticException("Range based Window Frame can have only 1 Sort Key");
+ }
+ }
+ vbs.setExpression(order.getExpressions().get(0).getExpression());
+ }
+ }
+
+ /*
+ * Represents a Select Expression in the context of Windowing. These can
+ * refer to the output of Windowing Functions and can navigate the
+ * Partition using Lead/Lag functions.
+ */
+ public static class WindowExpressionSpec {
+ String alias;
+ ASTNode expression;
+ public String getAlias() {
+ return alias;
+ }
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+ public ASTNode getExpression() {
+ return expression;
+ }
+ public void setExpression(ASTNode expression) {
+ this.expression = expression;
+ }
+ }
+
+ /*
+ * Represents a UDAF invocation in the context of a Window Frame. As
+ * explained above sometimes UDAFs will be handled as Window Functions
+ * even w/o an explicit Window specification. This is to support Queries
+ * that have no Group By clause. A Window Function invocation captures:
+ * - the ASTNode that represents this invocation
+ * - its name
+ * - whether it is star/distinct invocation.
+ * - its alias
+ * - and an optional Window specification
+ */
+ public static class WindowFunctionSpec extends WindowExpressionSpec
+ {
+ String name;
+ boolean isStar;
+ boolean isDistinct;
+ ArrayList<ASTNode> args;
+ WindowSpec windowSpec;
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public boolean isStar() {
+ return isStar;
+ }
+ public void setStar(boolean isStar) {
+ this.isStar = isStar;
+ }
+ public boolean isDistinct() {
+ return isDistinct;
+ }
+ public void setDistinct(boolean isDistinct) {
+ this.isDistinct = isDistinct;
+ }
+ public ArrayList<ASTNode> getArgs() {
+ args = args == null ? new ArrayList<ASTNode>() : args;
+ return args;
+ }
+ public void setArgs(ArrayList<ASTNode> args) {
+ this.args = args;
+ }
+ public void addArg(ASTNode arg) {
+ args = args == null ? new ArrayList<ASTNode>() : args;
+ args.add((ASTNode)arg);
+ }
+ public WindowSpec getWindowSpec() {
+ return windowSpec;
+ }
+ public void setWindowSpec(WindowSpec windowSpec) {
+ this.windowSpec = windowSpec;
+ }
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(name).append("(");
+ if (isStar )
+ {
+ buf.append("*");
+ }
+ else
+ {
+ if ( isDistinct )
+ {
+ buf.append("distinct ");
+ }
+ if ( args != null )
+ {
+ boolean first = true;
+ for(ASTNode arg : args)
+ {
+ if ( first) {
+ first = false;
+ } else {
+ buf.append(", ");
+ }
+ buf.append(arg.toStringTree());
+ }
+ }
+ }
+
+ buf.append(")");
+
+ if ( windowSpec != null )
+ {
+ buf.append(" ").append(windowSpec.toString());
+ }
+
+ if ( alias != null )
+ {
+ buf.append(" as ").append(alias);
+ }
+
+ return buf.toString();
+ }
+
+ }
+
+ /*
+ * It represents a WindowFrame applied to a Partitioning. A Window can
+ * refer to a <i>source</i> Window by name. The source Window provides the
+ * basis for this Window definition. This Window specification
+ * extends/overrides the <i>source</i> Window definition. In our e.g. the
+ * Select Expression $sum(p_retailprice) over (w1)$ is translated into a
+ * WindowFunction instance that has a Window specification that refers
+ * to the global Window Specification 'w1'. The Function's specification
+ * has no content, but inherits all its attributes from 'w1' during
+ * subsequent phases of translation.
+ */
+ public static class WindowSpec
+ {
+ String sourceId;
+ PartitioningSpec partitioning;
+ WindowFrameSpec windowFrame;
+ public String getSourceId() {
+ return sourceId;
+ }
+ public void setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ }
+ public PartitioningSpec getPartitioning() {
+ return partitioning;
+ }
+ public void setPartitioning(PartitioningSpec partitioning) {
+ this.partitioning = partitioning;
+ }
+ public WindowFrameSpec getWindowFrame() {
+ return windowFrame;
+ }
+ public void setWindowFrame(WindowFrameSpec windowFrame) {
+ this.windowFrame = windowFrame;
+ }
+ public PartitionSpec getPartition() {
+ return getPartitioning() == null ? null : getPartitioning().getPartSpec();
+ }
+ public void setPartition(PartitionSpec partSpec) {
+ partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+ partitioning.setPartSpec(partSpec);
+ }
+ public OrderSpec getOrder() {
+ return getPartitioning() == null ? null : getPartitioning().getOrderSpec();
+ }
+ public void setOrder(OrderSpec orderSpec) {
+ partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+ partitioning.setOrderSpec(orderSpec);
+ }
+ /*
+ * When there is no Order specified, we add the Partition expressions as
+ * Order expressions. This is an implementation artifact. For UDAFS that
+ * imply order (like rank, dense_rank) depend on the Order Expressions to
+ * work. Internally we pass the Order Expressions as Args to these functions.
+ * We could change the translation so that the Functions are setup with
+ * Partition expressions when the OrderSpec is null; but for now we are setting up
+ * an OrderSpec that copies the Partition expressions.
+ */
+ protected void ensureOrderSpec() {
+ if ( getOrder() == null ) {
+ OrderSpec order = new OrderSpec();
+ order.prefixBy(getPartition());
+ setOrder(order);
+ }
+ }
+ };
+
+ /*
+ * A WindowFrame specifies the Range on which a Window Function should
+ * be applied for the 'current' row. Its is specified by a <i>start</i> and
+ * <i>end</i> Boundary.
+ */
+ public static class WindowFrameSpec
+ {
+ BoundarySpec start;
+ BoundarySpec end;
+
+ public WindowFrameSpec() {
+ }
+
+ public WindowFrameSpec(BoundarySpec start, BoundarySpec end)
+ {
+ super();
+ this.start = start;
+ this.end = end;
+ }
+
+ public WindowFrameSpec(BoundarySpec start)
+ {
+ this(start, null);
+ }
+
+ public BoundarySpec getStart()
+ {
+ return start;
+ }
+
+ public void setStart(BoundarySpec start)
+ {
+ this.start = start;
+ }
+
+ public BoundarySpec getEnd()
+ {
+ return end;
+ }
+
+ public void setEnd(BoundarySpec end)
+ {
+ this.end = end;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("window(start=%s, end=%s)", start, end);
+ }
+
+ }
+
+ public static enum Direction
+ {
+ PRECEDING,
+ CURRENT,
+ FOLLOWING
+ };
+
+ /*
+ * A Boundary specifies how many rows back/forward a WindowFrame extends from the
+ * current row. A Boundary is specified as:
+ * - Range Boundary :: as the number of rows to go forward or back from
+ the Current Row.
+ * - Current Row :: which implies the Boundary is at the current row.
+ * - Value Boundary :: which is specified as the amount the value of an
+ Expression must decrease/increase
+ */
+ public abstract static class BoundarySpec implements Comparable<BoundarySpec>
+ {
+ public static int UNBOUNDED_AMOUNT = Integer.MAX_VALUE;
+
+ public abstract Direction getDirection();
+ public abstract void setDirection(Direction dir);
+ public abstract void setAmt(int amt);
+ public abstract int getAmt();
+ }
+
+ public static class RangeBoundarySpec extends BoundarySpec
+ {
+
+ Direction direction;
+ int amt;
+
+ public RangeBoundarySpec() {
+ }
+
+ public RangeBoundarySpec(Direction direction, int amt)
+ {
+ super();
+ this.direction = direction;
+ this.amt = amt;
+ }
+
+ @Override
+ public Direction getDirection()
+ {
+ return direction;
+ }
+
+ @Override
+ public void setDirection(Direction direction)
+ {
+ this.direction = direction;
+ }
+
+ @Override
+ public int getAmt()
+ {
+ return amt;
+ }
+
+ @Override
+ public void setAmt(int amt)
+ {
+ this.amt = amt;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("range(%s %s)", (amt == UNBOUNDED_AMOUNT ? "Unbounded" : amt),
+ direction);
+ }
+
+ public int compareTo(BoundarySpec other)
+ {
+ int c = direction.compareTo(other.getDirection());
+ if (c != 0) {
+ return c;
+ }
+ RangeBoundarySpec rb = (RangeBoundarySpec) other;
+ return amt - rb.amt;
+ }
+
+ }
+
+ public static class CurrentRowSpec extends BoundarySpec
+ {
+ public CurrentRowSpec() {
+ }
+
+ @Override
+ public String toString()
+ {
+ return "currentRow";
+ }
+
+ @Override
+ public Direction getDirection() {
+ return Direction.CURRENT;
+ }
+
+ @Override
+ public void setDirection(Direction dir) {}
+ @Override
+ public void setAmt(int amt) {}
+
+ public int compareTo(BoundarySpec other)
+ {
+ return getDirection().compareTo(other.getDirection());
+ }
+
+ @Override
+ public int getAmt() {return 0;}
+ }
+
+ public static class ValueBoundarySpec extends BoundarySpec
+ {
+ Direction direction;
+ ASTNode expression;
+ int amt;
+
+ public ValueBoundarySpec() {
+ }
+
+ public ValueBoundarySpec(Direction direction, int amt)
+ {
+ super();
+ this.direction = direction;
+ this.amt = amt;
+ }
+
+ @Override
+ public Direction getDirection()
+ {
+ return direction;
+ }
+
+ @Override
+ public void setDirection(Direction direction)
+ {
+ this.direction = direction;
+ }
+
+ public ASTNode getExpression()
+ {
+ return expression;
+ }
+
+ public void setExpression(ASTNode expression)
+ {
+ this.expression = expression;
+ }
+
+ @Override
+ public int getAmt()
+ {
+ return amt;
+ }
+
+ @Override
+ public void setAmt(int amt)
+ {
+ this.amt = amt;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("value(%s %s %s)", expression.toStringTree(), amt, direction);
+ }
+
+ public int compareTo(BoundarySpec other)
+ {
+ int c = direction.compareTo(other.getDirection());
+ if (c != 0) {
+ return c;
+ }
+ ValueBoundarySpec vb = (ValueBoundarySpec) other;
+ return amt - vb.amt;
+ }
+
+ }
+
+}