You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/02 16:16:37 UTC

svn commit: r1463556 [6/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/if/ ql/src/gen/thrift/gen-cpp/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/gen/thrift/gen-php/ ql/src/gen/thrift/gen...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Apr  2 14:16:34 2013
@@ -35,6 +35,8 @@ import java.util.regex.PatternSyntaxExce
 
 import org.antlr.runtime.tree.BaseTree;
 import org.antlr.runtime.tree.Tree;
+import org.antlr.runtime.tree.TreeWizard;
+import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
@@ -80,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -117,6 +120,24 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionedTableFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -152,6 +173,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -332,7 +357,9 @@ public class SemanticAnalyzer extends Ba
     switch (ast.getToken().getType()) {
     case HiveParser.TOK_QUERY: {
       QB qb = new QB(id, alias, true);
-      doPhase1(ast, qb, initPhase1Ctx());
+      Phase1Ctx ctx_1 = initPhase1Ctx();
+      doPhase1(ast, qb, ctx_1);
+
       qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
       qbexpr.setQB(qb);
     }
@@ -358,13 +385,28 @@ public class SemanticAnalyzer extends Ba
   }
 
   private LinkedHashMap<String, ASTNode> doPhase1GetAggregationsFromSelect(
-      ASTNode selExpr) {
+      ASTNode selExpr, QB qb, String dest) {
+
     // Iterate over the selects search for aggregation Trees.
     // Use String as keys to eliminate duplicate trees.
     LinkedHashMap<String, ASTNode> aggregationTrees = new LinkedHashMap<String, ASTNode>();
     for (int i = 0; i < selExpr.getChildCount(); ++i) {
-      ASTNode sel = (ASTNode) selExpr.getChild(i).getChild(0);
-      doPhase1GetAllAggregations(sel, aggregationTrees);
+      ASTNode sel = (ASTNode) selExpr.getChild(i);
+      doPhase1GetAllAggregations((ASTNode) sel.getChild(0), aggregationTrees);
+    }
+
+    /*
+     * remove any aggregation to be handled by Windowing.
+     */
+    if ( queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null ) {
+      HashMap<String, ASTNode> aliasToWdwExprs = qb.getParseInfo().getWindowingExprsForClause(dest);
+      LinkedHashMap<String, ASTNode> aggTreesMinusWindowing = new LinkedHashMap<String, ASTNode>();
+      for(Map.Entry<String,ASTNode> entry : aggregationTrees.entrySet()) {
+        if ( !aliasToWdwExprs.containsKey(entry.getKey())) {
+          aggTreesMinusWindowing.put(entry.getKey(), entry.getValue());
+        }
+      }
+      aggregationTrees = aggTreesMinusWindowing;
     }
     return aggregationTrees;
   }
@@ -650,6 +692,17 @@ public class SemanticAnalyzer extends Ba
         processTable(qb, child);
       } else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) {
         processSubQuery(qb, child);
+      } else if (child.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) {
+        queryProperties.setHasPTF(true);
+        processPTF(qb, child);
+        PTFInvocationSpec ptfInvocationSpec = qb.getPTFInvocationSpec(child);
+        String inputAlias = ptfInvocationSpec == null ? null :
+          ((PartitionedTableFunctionSpec)ptfInvocationSpec.getFunction()).getAlias();;
+        if ( inputAlias == null ) {
+          throw new SemanticException(generateErrorMessage(child,
+              "PTF invocation in a Join must have an alias"));
+        }
+
       } else if (child.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) {
         // SELECT * FROM src1 LATERAL VIEW udtf() AS myTable JOIN src2 ...
         // is not supported. Instead, the lateral view must be in a subquery
@@ -741,11 +794,14 @@ public class SemanticAnalyzer extends Ba
           qbp.setHints((ASTNode) ast.getChild(0));
         }
 
-        LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
+        handleWindowingExprsInSelectList(qb, ctx_1.dest, ast);
+
+        LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast,
+            qb, ctx_1.dest);
         doPhase1GetColumnAliasesFromSelect(ast, qbp);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
         qbp.setDistinctFuncExprsForClause(ctx_1.dest,
-            doPhase1GetDistinctFuncExprs(aggregations));
+        doPhase1GetDistinctFuncExprs(aggregations));
         break;
 
       case HiveParser.TOK_WHERE:
@@ -770,7 +826,6 @@ public class SemanticAnalyzer extends Ba
                 .getMsg(ast));
           }
         }
-
         qbp.setDestForClause(ctx_1.dest, (ASTNode) ast.getChild(0));
         break;
 
@@ -793,6 +848,9 @@ public class SemanticAnalyzer extends Ba
           queryProperties.setHasJoin(true);
           processJoin(qb, frm);
           qbp.setJoinExpr(frm);
+        }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){
+          queryProperties.setHasPTF(true);
+          processPTF(qb, frm);
         }
         break;
 
@@ -819,7 +877,7 @@ public class SemanticAnalyzer extends Ba
         break;
 
       case HiveParser.TOK_SORTBY:
-        // Get the sort by aliases - these are aliased to the entries in the
+     // Get the sort by aliases - these are aliased to the entries in the
         // select list
         queryProperties.setHasSortBy(true);
         qbp.setSortByExprForClause(ctx_1.dest, ast);
@@ -873,7 +931,16 @@ public class SemanticAnalyzer extends Ba
 
       case HiveParser.TOK_HAVING:
         qbp.setHavingExprForClause(ctx_1.dest, ast);
-        qbp.addAggregationExprsForClause(ctx_1.dest, doPhase1GetAggregationsFromSelect(ast));
+        qbp.addAggregationExprsForClause(ctx_1.dest,
+            doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest));
+        break;
+
+      case HiveParser.KW_WINDOW:
+        if (!qb.hasWindowingSpec(ctx_1.dest) ) {
+          throw new SemanticException(generateErrorMessage(ast,
+              "Query has no Cluster/Distribute By; but has a Window definition"));
+        }
+        handleQueryWindowClauses(qb, ctx_1, ast);
         break;
 
       case HiveParser.TOK_LIMIT:
@@ -2193,12 +2260,20 @@ public class SemanticAnalyzer extends Ba
       List<ASTNode> result = new ArrayList<ASTNode>(selectExprs == null ? 0
           : selectExprs.getChildCount());
       if (selectExprs != null) {
+        HashMap<String, ASTNode> windowingExprs = parseInfo.getWindowingExprsForClause(dest);
+
         for (int i = 0; i < selectExprs.getChildCount(); ++i) {
           if (((ASTNode) selectExprs.getChild(i)).getToken().getType() == HiveParser.TOK_HINTLIST) {
             continue;
           }
           // table.column AS alias
           ASTNode grpbyExpr = (ASTNode) selectExprs.getChild(i).getChild(0);
+          /*
+           * If this is handled by Windowing then ignore it.
+           */
+          if (windowingExprs != null && windowingExprs.containsKey(grpbyExpr.toStringTree())) {
+            continue;
+          }
           result.add(grpbyExpr);
         }
       }
@@ -2225,7 +2300,10 @@ public class SemanticAnalyzer extends Ba
     String tabAlias = null;
     String[] colRef = new String[2];
 
-    if (selExpr.getChildCount() == 2) {
+    //for queries with a windowing expressions, the selexpr may have a third child
+    if (selExpr.getChildCount() == 2 ||
+        (selExpr.getChildCount() == 3 &&
+        selExpr.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC)) {
       // return zz for "xx + yy AS zz"
       colAlias = unescapeIdentifier(selExpr.getChild(1).getText());
       colRef[0] = tabAlias;
@@ -2302,6 +2380,7 @@ public class SemanticAnalyzer extends Ba
     return false;
   }
 
+
   private Operator<?> genSelectPlan(String dest, QB qb, Operator<?> input)
       throws SemanticException {
     ASTNode selExprList = qb.getParseInfo().getSelForClause(dest);
@@ -2439,11 +2518,14 @@ public class SemanticAnalyzer extends Ba
       // child can be EXPR AS ALIAS, or EXPR.
       ASTNode child = (ASTNode) exprList.getChild(i);
       boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
+      boolean isWindowSpec = child.getChildCount() == 3 ?
+          (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
+            false;
 
       // EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
       // This check is not needed and invalid when there is a transform b/c the
       // AST's are slightly different.
-      if (!isInTransform && !isUDTF && child.getChildCount() > 2) {
+      if (!isWindowSpec && !isInTransform && !isUDTF && child.getChildCount() > 2) {
         throw new SemanticException(generateErrorMessage(
             (ASTNode) child.getChild(2),
             ErrorMsg.INVALID_AS.getMsg()));
@@ -6290,12 +6372,18 @@ public class SemanticAnalyzer extends Ba
     ASTNode right = (ASTNode) joinParseTree.getChild(1);
 
     if ((left.getToken().getType() == HiveParser.TOK_TABREF)
-        || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+        || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)
+        || (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
       String tableName = getUnescapedUnqualifiedTableName((ASTNode) left.getChild(0))
           .toLowerCase();
       String alias = left.getChildCount() == 1 ? tableName
           : unescapeIdentifier(left.getChild(left.getChildCount() - 1)
-              .getText().toLowerCase());
+          .getText().toLowerCase());
+      // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+      // guranteed to have an lias here: check done in processJoin
+      alias = (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+          unescapeIdentifier(left.getChild(1).getText().toLowerCase()) :
+            alias;
       joinTree.setLeftAlias(alias);
       String[] leftAliases = new String[1];
       leftAliases[0] = alias;
@@ -6321,12 +6409,18 @@ public class SemanticAnalyzer extends Ba
     }
 
     if ((right.getToken().getType() == HiveParser.TOK_TABREF)
-        || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+        || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)
+        || (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
       String tableName = getUnescapedUnqualifiedTableName((ASTNode) right.getChild(0))
           .toLowerCase();
       String alias = right.getChildCount() == 1 ? tableName
           : unescapeIdentifier(right.getChild(right.getChildCount() - 1)
-              .getText().toLowerCase());
+          .getText().toLowerCase());
+      // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+      // guranteed to have an lias here: check done in processJoin
+      alias = (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+          unescapeIdentifier(right.getChild(1).getText().toLowerCase()) :
+            alias;
       String[] rightAliases = new String[1];
       rightAliases[0] = alias;
       joinTree.setRightAliases(rightAliases);
@@ -7109,6 +7203,11 @@ public class SemanticAnalyzer extends Ba
       curr = genHavingPlan(dest, qb, curr);
     }
 
+
+    if(queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null) {
+      curr = genWindowingPlan(qb.getWindowingSpec(dest), curr);
+    }
+
     curr = genSelectPlan(dest, qb, curr);
     Integer limit = qbp.getDestLimit(dest);
 
@@ -7148,6 +7247,7 @@ public class SemanticAnalyzer extends Ba
       curr = genReduceSinkPlan(dest, qb, curr, numReducers);
     }
 
+
     if (qbp.getIsSubQ()) {
       if (limit != null) {
         // In case of order by, only 1 reducer is used, so no need of
@@ -7793,11 +7893,38 @@ public class SemanticAnalyzer extends Ba
       aliasToOpInfo.put(alias, op);
     }
 
+    Operator srcOpInfo = null;
+    Operator lastPTFOp = null;
+
+    if(queryProperties.hasPTF()){
+      //After processing subqueries and source tables, process
+      // partitioned table functions
+
+      HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
+      if ( ptfNodeToSpec != null ) {
+        for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
+          ASTNode ast = entry.getKey();
+          PTFInvocationSpec spec = entry.getValue();
+          String inputAlias = spec.getQueryInputName();
+          Operator inOp = aliasToOpInfo.get(inputAlias);
+          if ( inOp == null ) {
+            throw new SemanticException(generateErrorMessage(ast,
+                "Cannot resolve input Operator for PTF invocation"));
+          }
+          lastPTFOp = genPTFPlan(spec, inOp);
+          String ptfAlias = ((PartitionedTableFunctionSpec)spec.getFunction()).getAlias();
+          if ( ptfAlias != null ) {
+            aliasToOpInfo.put(ptfAlias, lastPTFOp);
+          }
+        }
+      }
+
+    }
+
     // For all the source tables that have a lateral view, attach the
     // appropriate operators to the TS
     genLateralViewPlans(aliasToOpInfo, qb);
 
-    Operator srcOpInfo = null;
 
     // process join
     if (qb.getParseInfo().getJoinExpr() != null) {
@@ -7820,6 +7947,9 @@ public class SemanticAnalyzer extends Ba
       // Now if there are more than 1 sources then we have a join case
       // later we can extend this to the union all case as well
       srcOpInfo = aliasToOpInfo.values().iterator().next();
+      // with ptfs, there maybe more (note for PTFChains:
+      // 1 ptf invocation may entail multiple PTF operators)
+      srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
     }
 
     Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo);
@@ -8478,7 +8608,8 @@ public class SemanticAnalyzer extends Ba
     }
 
     // continue analyzing from the child ASTNode.
-    if (!doPhase1(child, qb, initPhase1Ctx())) {
+    Phase1Ctx ctx_1 = initPhase1Ctx();
+    if (!doPhase1(child, qb, ctx_1)) {
       // if phase1Result false return
       return;
     }
@@ -9672,4 +9803,1040 @@ public class SemanticAnalyzer extends Ba
   public void setQB(QB qb) {
     this.qb = qb;
   }
+
+//--------------------------- PTF handling -----------------------------------
+
+  /*
+   * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+   *   PTF invocation.
+   * - For a TABLEREF: set the source to the alias returned by processTable
+   * - For a SubQuery: set the source to the alias returned by processSubQuery
+   * - For a PTF invocation: recursively call processPTFChain.
+   */
+  private PTFInputSpec processPTFSource(QB qb, ASTNode inputNode) throws SemanticException{
+
+    PTFInputSpec qInSpec = null;
+    int type = inputNode.getType();
+    String alias;
+    switch(type)
+    {
+    case HiveParser.TOK_TABREF:
+      alias = processTable(qb, inputNode);
+      qInSpec = new PTFQueryInputSpec();
+      ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.TABLE);
+      ((PTFQueryInputSpec)qInSpec).setSource(alias);
+      break;
+    case HiveParser.TOK_SUBQUERY:
+      alias = processSubQuery(qb, inputNode);
+      qInSpec = new PTFQueryInputSpec();
+      ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.SUBQUERY);
+      ((PTFQueryInputSpec)qInSpec).setSource(alias);
+      break;
+    case HiveParser.TOK_PTBLFUNCTION:
+      qInSpec = processPTFChain(qb, inputNode);
+      break;
+    default:
+      throw new SemanticException(generateErrorMessage(inputNode,
+          "Unknown input type to PTF"));
+    }
+
+    qInSpec.setAstNode(inputNode);
+    return qInSpec;
+
+  }
+
+  /*
+   * - tree form is
+   *   ^(TOK_PTBLFUNCTION name alias? partitionTableFunctionSource partitioningSpec? arguments*)
+   * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+   *   PTF invocation.
+   */
+  private PartitionedTableFunctionSpec processPTFChain(QB qb, ASTNode ptf)
+      throws SemanticException{
+    int child_count = ptf.getChildCount();
+    if (child_count < 2) {
+      throw new SemanticException(generateErrorMessage(ptf,
+                  "Not enough Children " + child_count));
+    }
+
+    PartitionedTableFunctionSpec ptfSpec = new PartitionedTableFunctionSpec();
+    ptfSpec.setAstNode(ptf);
+
+    /*
+     * name
+     */
+    ASTNode nameNode = (ASTNode) ptf.getChild(0);
+    ptfSpec.setName(nameNode.getText());
+
+    int inputIdx = 1;
+
+    /*
+     * alias
+     */
+    ASTNode secondChild = (ASTNode) ptf.getChild(1);
+    if ( secondChild.getType() == HiveParser.Identifier ) {
+      ptfSpec.setAlias(secondChild.getText());
+      inputIdx++;
+    }
+
+    /*
+     * input
+     */
+    ASTNode inputNode = (ASTNode) ptf.getChild(inputIdx);
+    ptfSpec.setInput(processPTFSource(qb, inputNode));
+
+    int argStartIdx = inputIdx + 1;
+
+    /*
+     * partitioning Spec
+     */
+    int pSpecIdx = inputIdx + 1;
+    ASTNode pSpecNode = ptf.getChildCount() > inputIdx ?
+        (ASTNode) ptf.getChild(pSpecIdx) : null;
+    if (pSpecNode != null && pSpecNode.getType() == HiveParser.TOK_PARTITIONINGSPEC)
+    {
+      PartitioningSpec partitioning = processPTFPartitionSpec(pSpecNode);
+      ptfSpec.setPartitioning(partitioning);
+      argStartIdx++;
+    }
+
+    /*
+     * arguments
+     */
+    for(int i=argStartIdx; i < ptf.getChildCount(); i++)
+    {
+      ptfSpec.addArg((ASTNode) ptf.getChild(i));
+    }
+    return ptfSpec;
+  }
+
+  /*
+   * - invoked during FROM AST tree processing, on encountering a PTF invocation.
+   * - tree form is
+   *   ^(TOK_PTBLFUNCTION name partitionTableFunctionSource partitioningSpec? arguments*)
+   * - setup a PTFInvocationSpec for this top level PTF invocation.
+   */
+  private void processPTF(QB qb, ASTNode ptf) throws SemanticException{
+
+    PartitionedTableFunctionSpec ptfSpec = processPTFChain(qb, ptf);
+
+    if ( ptfSpec.getAlias() != null ) {
+      qb.addAlias(ptfSpec.getAlias());
+    }
+
+    PTFInvocationSpec spec = new PTFInvocationSpec();
+    spec.setFunction(ptfSpec);
+    qb.addPTFNodeToSpec(ptf, spec);
+  }
+
+//--------------------------- Windowing handling -----------------------------------
+
+  /*
+   * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+   * What makes a UDAF invocation a Windowing Function invocation:
+   * 1. It appears in a SelectExpr that as a WindowSpec
+   * 2. It is a UDAF that implies order (FunctionRegistry.impliesOrder)
+   * 3. It contains lead/lag UDF invocations in its args.
+   */
+  private boolean checkAndExtractWindowFunctionsInSelect(QB qb, ASTNode selectExpr, String dest)
+      throws SemanticException {
+
+    int childCount = selectExpr.getChildCount();
+    ASTNode windowSpec = (ASTNode) selectExpr.getChild(childCount - 1);
+
+    boolean hasWindowSpec = windowSpec.getType() == HiveParser.TOK_WINDOWSPEC;
+
+    ArrayList<ASTNode> functions =
+        extractWindowingUDAFs((ASTNode) selectExpr.getChild(0), !hasWindowSpec);
+    if ( functions.size() == 0 ) {
+      return false;
+    }
+
+    WindowingSpec spec = qb.getWindowingSpec(dest);
+    if(spec == null) {
+      queryProperties.setHasWindowing(true);
+      spec = new WindowingSpec();
+      qb.addDestToWindowingSpec(dest, spec);
+    }
+
+    HashMap<String, ASTNode> wExprsInDest = qb.getParseInfo().getWindowingExprsForClause(dest);
+    int wColIdx = spec.getWindowExpressions() == null ? 0 : spec.getWindowExpressions().size();
+    for(ASTNode function : functions) {
+      WindowFunctionSpec wFnSpec = processWindowFunction(function,
+          hasWindowSpec ? windowSpec : null);
+
+      /*
+       * If this is a duplicate invocation of a function; don't add to WindowingSpec.
+       */
+      if ( wExprsInDest != null &&
+          wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
+        continue;
+      }
+      wFnSpec.setAlias("_wcol" + wColIdx++);
+      spec.addWindowFunction(wFnSpec);
+      qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
+    }
+    return true;
+  }
+
+  /*
+   * return the UDAFs within the expressionTree.
+   * If implyOrder is true, then only return the invocations that:
+   * - are for UDAFs that implyOrder (FunctionRegistry.implyOrder)
+   * - or contain a Lead/Lag UDF invocation in their arguments
+   * If implyOrder is false, then return all UDAF invocations.
+   */
+  private ArrayList<ASTNode> extractWindowingUDAFs(ASTNode expressionTree, boolean implyOrder) {
+    ArrayList<ASTNode> aggregations = new ArrayList<ASTNode>();
+    extractWindowingUDAFs(expressionTree, aggregations);
+    if (!implyOrder) {
+      return aggregations;
+    }
+    ArrayList<ASTNode> wdwUDAFs = new ArrayList<ASTNode>();
+    for(ASTNode function : aggregations) {
+      String fnName = function.getChild(0).getText().toLowerCase();
+      if ( FunctionRegistry.impliesOrder(fnName)) {
+        wdwUDAFs.add(function);
+        continue;
+      }
+      boolean hasLLInArgs = false;
+      for(int i=1; i < function.getChildCount(); i++) {
+        ASTNode child = (ASTNode) function.getChild(i);
+        hasLLInArgs = containsLeadLagUDF(child);
+        if (hasLLInArgs) {
+          break;
+        }
+      }
+      if (hasLLInArgs) {
+        wdwUDAFs.add(function);
+      }
+    }
+    return wdwUDAFs;
+  }
+
+  private void extractWindowingUDAFs(ASTNode expressionTree,
+      ArrayList<ASTNode> aggregations) {
+    int exprTokenType = expressionTree.getToken().getType();
+    if (exprTokenType == HiveParser.TOK_FUNCTION
+        || exprTokenType == HiveParser.TOK_FUNCTIONDI
+        || exprTokenType == HiveParser.TOK_FUNCTIONSTAR) {
+      assert (expressionTree.getChildCount() != 0);
+      if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+        String functionName = unescapeIdentifier(expressionTree.getChild(0)
+            .getText());
+        WindowFunctionInfo fi = FunctionRegistry.getWindowFunctionInfo(functionName);
+        if (fi != null) {
+          aggregations.add(expressionTree);
+          return;
+        }
+      }
+    }
+    for (int i = 0; i < expressionTree.getChildCount(); i++) {
+      extractWindowingUDAFs((ASTNode) expressionTree.getChild(i),
+          aggregations);
+    }
+  }
+
+  private boolean containsLeadLagUDF(ASTNode expressionTree) {
+    int exprTokenType = expressionTree.getToken().getType();
+    if (exprTokenType == HiveParser.TOK_FUNCTION) {
+      assert (expressionTree.getChildCount() != 0);
+      if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+        String functionName = unescapeIdentifier(expressionTree.getChild(0)
+            .getText());
+        functionName = functionName.toLowerCase();
+        if ( FunctionRegistry.LAG_FUNC_NAME.equals(functionName) ||
+            FunctionRegistry.LEAD_FUNC_NAME.equals(functionName)
+            ) {
+          return true;
+        }
+      }
+    }
+    for (int i = 0; i < expressionTree.getChildCount(); i++) {
+      if ( containsLeadLagUDF((ASTNode) expressionTree.getChild(i))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /*
+   * - Invoked during Phase1 when a TOK_SELECT is encountered.
+   * - Select tree form is: ^(TOK_SELECT ^(TOK_SELECTEXPR...) ^(TOK_SELECTEXPR...) ...)
+   * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+   *
+   * See checkAndExtractWindowFunctionsInSelect for rules on what makes a UDAF invocation
+   * a Windowing Function invocation
+   */
+  private void handleWindowingExprsInSelectList(QB qb, String dest, ASTNode selectNode)
+      throws SemanticException {
+    for(int i=0; i < selectNode.getChildCount(); i++)
+    {
+      ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+      if ( selectExpr.getType() != HiveParser.TOK_SELEXPR )
+      {
+        continue;
+      }
+      boolean hasWindowingExprs = checkAndExtractWindowFunctionsInSelect(qb, selectExpr, dest);
+
+    }
+  }
+
+  private void handleQueryWindowClauses(QB qb, Phase1Ctx ctx_1, ASTNode node)
+      throws SemanticException {
+    WindowingSpec spec = qb.getWindowingSpec(ctx_1.dest);
+    for(int i=0; i < node.getChildCount(); i++) {
+      processQueryWindowClause(spec, (ASTNode) node.getChild(i));
+    }
+  }
+
+  private PartitionSpec processPartitionSpec(ASTNode node) {
+    PartitionSpec pSpec = new PartitionSpec();
+    int exprCnt = node.getChildCount();
+    for(int i=0; i < exprCnt; i++) {
+      PartitionExpression exprSpec = new PartitionExpression();
+      exprSpec.setExpression((ASTNode) node.getChild(i));
+      pSpec.addExpression(exprSpec);
+    }
+    return pSpec;
+  }
+
+  private OrderSpec processOrderSpec(ASTNode sortNode) {
+    OrderSpec oSpec = new OrderSpec();
+    int exprCnt = sortNode.getChildCount();
+    for(int i=0; i < exprCnt; i++) {
+      OrderExpression exprSpec = new OrderExpression();
+      exprSpec.setExpression((ASTNode) sortNode.getChild(i).getChild(0));
+      if ( sortNode.getChild(i).getType() == HiveParser.TOK_TABSORTCOLNAMEASC ) {
+        exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.ASC);
+      }
+      else {
+        exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC);
+      }
+      oSpec.addExpression(exprSpec);
+    }
+    return oSpec;
+  }
+
+  private PartitioningSpec processPTFPartitionSpec(ASTNode pSpecNode)
+  {
+    PartitioningSpec partitioning = new PartitioningSpec();
+    ASTNode firstChild = (ASTNode) pSpecNode.getChild(0);
+    int type = firstChild.getType();
+    int exprCnt;
+
+
+    if ( type == HiveParser.TOK_DISTRIBUTEBY || type == HiveParser.TOK_CLUSTERBY )
+    {
+      PartitionSpec pSpec = processPartitionSpec(firstChild);
+      partitioning.setPartSpec(pSpec);
+      ASTNode sortNode = pSpecNode.getChildCount() > 1 ? (ASTNode) pSpecNode.getChild(1) : null;
+      if ( sortNode != null )
+      {
+        OrderSpec oSpec = processOrderSpec(sortNode);
+        partitioning.setOrderSpec(oSpec);
+      }
+    }
+    else if ( type == HiveParser.TOK_SORTBY || type == HiveParser.TOK_ORDERBY ) {
+      ASTNode sortNode = firstChild;
+      OrderSpec oSpec = processOrderSpec(sortNode);
+      partitioning.setOrderSpec(oSpec);
+    }
+    return partitioning;
+  }
+
+  private WindowFunctionSpec processWindowFunction(ASTNode node, ASTNode wsNode)
+    throws SemanticException {
+    WindowFunctionSpec wfSpec = new WindowFunctionSpec();
+
+    switch(node.getType()) {
+    case HiveParser.TOK_FUNCTIONSTAR:
+      wfSpec.setStar(true);
+      break;
+    case HiveParser.TOK_FUNCTIONDI:
+      wfSpec.setDistinct(true);
+      break;
+    }
+
+    if ( wfSpec.isDistinct() ) {
+      throw new SemanticException(generateErrorMessage(node,
+          "Count/Sum distinct not supported with Windowing"));
+    }
+
+    wfSpec.setExpression(node);
+
+    ASTNode nameNode = (ASTNode) node.getChild(0);
+    wfSpec.setName(nameNode.getText());
+
+    for(int i=1; i < node.getChildCount(); i++) {
+      ASTNode child = (ASTNode) node.getChild(i);
+      wfSpec.addArg(child);
+    }
+
+    if ( wsNode != null ) {
+      WindowSpec ws = processWindowSpec(wsNode);
+      wfSpec.setWindowSpec(ws);
+    }
+
+    /*
+     * In order to distinguish between different UDAF invocations on the same UDAF but different Windows
+     * add the WdwSpec node as a child of the Function Node.
+     * It is safe to do this after the function node has been converetd to a WdwFuncSpec.
+     */
+    if ( wsNode != null ) {
+      node.addChild(wsNode);
+    }
+
+    return wfSpec;
+  }
+
+  private void processQueryWindowClause(WindowingSpec spec, ASTNode node)
+      throws SemanticException {
+    ASTNode nameNode = (ASTNode) node.getChild(0);
+    ASTNode wsNode = (ASTNode) node.getChild(1);
+    if(spec.getWindowSpecs() != null && spec.getWindowSpecs().containsKey(nameNode.getText())){
+      throw new SemanticException(generateErrorMessage(nameNode,
+          "Duplicate definition of window " + nameNode.getText() +
+          " is not allowed"));
+    }
+    WindowSpec ws = processWindowSpec(wsNode);
+    spec.addWindowSpec(nameNode.getText(), ws);
+  }
+
+  private WindowSpec processWindowSpec(ASTNode node) throws SemanticException {
+    String sourceId = null;
+    PartitionSpec partition = null;
+    OrderSpec order = null;
+    WindowFrameSpec windowFrame = null;
+
+    boolean hasSrcId = false, hasPartSpec = false, hasWF = false;
+    int srcIdIdx = -1, partIdx = -1, wfIdx = -1;
+
+    for(int i=0; i < node.getChildCount(); i++)
+    {
+      int type = node.getChild(i).getType();
+      switch(type)
+      {
+      case HiveParser.Identifier:
+        hasSrcId = true; srcIdIdx = i;
+        break;
+      case HiveParser.TOK_PARTITIONINGSPEC:
+        hasPartSpec = true; partIdx = i;
+        break;
+      case HiveParser.TOK_WINDOWRANGE:
+      case HiveParser.TOK_WINDOWVALUES:
+        hasWF = true; wfIdx = i;
+        break;
+      }
+    }
+
+    WindowSpec ws = new WindowSpec();
+
+    if (hasSrcId) {
+      ASTNode nameNode = (ASTNode) node.getChild(srcIdIdx);
+      ws.setSourceId(nameNode.getText());
+    }
+
+    if (hasPartSpec) {
+      ASTNode partNode = (ASTNode) node.getChild(partIdx);
+      PartitioningSpec partitioning = processPTFPartitionSpec(partNode);
+      ws.setPartitioning(partitioning);
+    }
+
+    if ( hasWF)
+    {
+      ASTNode wfNode = (ASTNode) node.getChild(wfIdx);
+      WindowFrameSpec wfSpec = processWindowFrame(wfNode);
+      ws.setWindowFrame(wfSpec);
+    }
+
+    return ws;
+  }
+
+  private WindowFrameSpec processWindowFrame(ASTNode node) throws SemanticException {
+    int type = node.getType();
+    BoundarySpec start = null, end = null;
+
+    /*
+     * A WindowFrame may contain just the Start Boundary or in the
+     * between style of expressing a WindowFrame both boundaries
+     * are specified.
+     */
+    start = processBoundary(type, (ASTNode) node.getChild(0));
+    if ( node.getChildCount() > 1 ) {
+      end = processBoundary(type, (ASTNode) node.getChild(1));
+    }
+
+    return new WindowFrameSpec(start, end);
+  }
+
+  private BoundarySpec processBoundary(int frameType, ASTNode node)  throws SemanticException {
+    BoundarySpec bs = frameType == HiveParser.TOK_WINDOWRANGE ?
+        new RangeBoundarySpec() : new ValueBoundarySpec();
+    int type = node.getType();
+    boolean hasAmt = true;
+
+    switch(type)
+    {
+    case HiveParser.KW_PRECEDING:
+      bs.setDirection(Direction.PRECEDING);
+      break;
+    case HiveParser.KW_FOLLOWING:
+      bs.setDirection(Direction.FOLLOWING);
+      break;
+    case HiveParser.KW_CURRENT:
+      bs = new CurrentRowSpec();
+      hasAmt = false;
+      break;
+    }
+
+    if ( hasAmt )
+    {
+      ASTNode amtNode = (ASTNode) node.getChild(0);
+      if ( amtNode.getType() == HiveParser.KW_UNBOUNDED)
+      {
+        bs.setAmt(BoundarySpec.UNBOUNDED_AMOUNT);
+      }
+      else
+      {
+        int amt = Integer.parseInt(amtNode.getText());
+        if ( amt < 0 ) {
+          throw new SemanticException(
+              "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt);
+        }
+        bs.setAmt(amt);
+      }
+    }
+
+    return bs;
+  }
+
+  /*
+   * check if a Select Expr is a constant.
+   * - current logic used is to look for HiveParser.TOK_TABLE_OR_COL
+   * - if there is none then the expression is a constant.
+   */
+  private static class ConstantExprCheck implements ContextVisitor {
+    boolean isConstant = true;
+
+    public void visit(Object t, Object parent, int childIndex, Map labels) {
+      if ( !isConstant ) {
+        return;
+      }
+      ASTNode node = (ASTNode) t;
+      if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL ) {
+        isConstant = false;
+      }
+    }
+
+    public void reset() {
+      isConstant = true;
+    }
+
+    protected boolean isConstant() {
+      return isConstant;
+    }
+  }
+
+  private static class AggregationExprCheck implements ContextVisitor {
+    HashMap<String, ASTNode> destAggrExprs;
+    boolean isAggr = false;
+
+    public AggregationExprCheck(HashMap<String, ASTNode> destAggrExprs) {
+      super();
+      this.destAggrExprs = destAggrExprs;
+    }
+
+    public void visit(Object t, Object parent, int childIndex, Map labels) {
+      if ( isAggr ) {
+        return;
+      }
+      if ( destAggrExprs.values().contains(t)) {
+        isAggr = true;
+      }
+    }
+
+    public void reset() {
+      isAggr = false;
+    }
+
+    protected boolean isAggr() {
+      return isAggr;
+    }
+  }
+
+  /*
+   * Returns false if there is a SelectExpr that is not a constant or an aggr.
+   *
+   */
+  private boolean isValidGroupBySelectList(QB currQB, String clause){
+    ConstantExprCheck constantExprCheck = new ConstantExprCheck();
+    AggregationExprCheck aggrExprCheck = new AggregationExprCheck(
+        currQB.getParseInfo().getAggregationExprsForClause(clause));
+
+    TreeWizard tw = new TreeWizard(ParseDriver.adaptor, HiveParser.tokenNames);
+    ASTNode selectNode = currQB.getParseInfo().getSelForClause(clause);
+
+    /*
+     * for Select Distinct Queries we don't move any aggregations.
+     */
+    if ( selectNode != null && selectNode.getType() == HiveParser.TOK_SELECTDI ) {
+      return true;
+    }
+
+    for (int i = 0; selectNode != null && i < selectNode.getChildCount(); i++) {
+      ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+      //check for TOK_HINTLIST expressions on ast
+      if(selectExpr.getType() != HiveParser.TOK_SELEXPR){
+        continue;
+      }
+
+      constantExprCheck.reset();
+      PTFTranslator.visit(selectExpr.getChild(0), constantExprCheck);
+
+      if ( !constantExprCheck.isConstant() ) {
+        aggrExprCheck.reset();
+        PTFTranslator.visit(selectExpr.getChild(0), aggrExprCheck);
+        if (!aggrExprCheck.isAggr() ) {
+          return false;
+        }
+      }
+
+    }
+    return true;
+  }
+
+//--------------------------- PTF handling: PTFInvocationSpec to PTFDesc --------------------------
+
+  private PTFDesc translatePTFInvocationSpec(PTFInvocationSpec ptfQSpec, RowResolver inputRR)
+      throws SemanticException{
+    PTFDesc ptfDesc = null;
+    PTFTranslator translator = new PTFTranslator();
+    ptfDesc = translator.translate(ptfQSpec, this, conf, inputRR, unparseTranslator);
+    return ptfDesc;
+  }
+
+  Operator genPTFPlan(PTFInvocationSpec ptfQSpec, Operator input) throws SemanticException {
+    ArrayList<PTFInvocationSpec> componentQueries = PTFTranslator.componentize(ptfQSpec);
+    for (PTFInvocationSpec ptfSpec : componentQueries) {
+      input = genPTFPlanForComponentQuery(ptfSpec, input);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created PTF Plan ");
+    }
+    return input;
+  }
+
+
+  /**
+   * Construct the data structures containing ExprNodeDesc for partition
+   * columns and order columns. Use the input definition to construct the list
+   * of output columns for the ReduceSinkOperator
+   *
+   * @throws SemanticException
+   */
+  void buildPTFReduceSinkDetails(PartitionedTableFunctionDef tabDef,
+      RowResolver inputRR,
+      ArrayList<ExprNodeDesc> partCols,
+      ArrayList<ExprNodeDesc> valueCols,
+      ArrayList<ExprNodeDesc> orderCols,
+      Map<String, ExprNodeDesc> colExprMap,
+      List<String> outputColumnNames,
+      StringBuilder orderString,
+      RowResolver extractRR) throws SemanticException {
+
+    ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
+
+    for (PTFExpressionDef colDef : partColList) {
+      partCols.add(colDef.getExprNode());
+      orderCols.add(colDef.getExprNode());
+      orderString.append('+');
+    }
+
+    /*
+     * Order columns are used as key columns for constructing
+     * the ReduceSinkOperator
+     * Since we do not explicitly add these to outputColumnNames,
+     * we need to set includeKeyCols = false while creating the
+     * ReduceSinkDesc
+     */
+    ArrayList<OrderExpressionDef> orderColList = tabDef.getOrder().getExpressions();
+    for (int i = 0; i < orderColList.size(); i++) {
+      OrderExpressionDef colDef = orderColList.get(i);
+      org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = colDef.getOrder();
+      if (order.name().equals("ASC")) {
+        orderString.append('+');
+      } else {
+        orderString.append('-');
+      }
+      orderCols.add(colDef.getExprNode());
+    }
+
+    /*
+     * We add the column to value columns or output column names
+     * only if it is not a virtual column
+     */
+    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+        new LinkedHashMap<String[], ColumnInfo>();
+    int pos = 0;
+    for (ColumnInfo colInfo : colInfoList) {
+      if (!colInfo.isHiddenVirtualCol()) {
+        ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+            .getInternalName(), colInfo.getTabAlias(), colInfo
+            .getIsVirtualCol());
+        valueCols.add(valueColExpr);
+        colExprMap.put(colInfo.getInternalName(), valueColExpr);
+        String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+        outputColumnNames.add(outColName);
+
+        String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+        /*
+         * if we have already encountered this colInfo internalName.
+         * We encounter it again because it must be put for the Having clause.
+         * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+         */
+        if ( colsAddedByHaving.containsKey(alias)) {
+          continue;
+        }
+        ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+        ColumnInfo eColInfo = new ColumnInfo(
+            outColName, colInfo.getType(), alias[0],
+            colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+        if ( astNode == null ) {
+          extractRR.put(alias[0], alias[1], eColInfo);
+        }
+        else {
+          /*
+           * in case having clause refers to this column may have been added twice;
+           * once with the ASTNode.toStringTree as the alias
+           * and then with the real alias.
+           */
+          extractRR.putExpression(astNode, eColInfo);
+          if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+            colsAddedByHaving.put(alias, eColInfo);
+          }
+        }
+      }
+    }
+
+    for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+      String[] alias = columnAddedByHaving.getKey();
+      ColumnInfo eColInfo = columnAddedByHaving.getValue();
+      extractRR.put(alias[0], alias[1], eColInfo);
+    }
+  }
+
+  private Operator genPTFPlanForComponentQuery(PTFInvocationSpec ptfQSpec, Operator input)
+      throws SemanticException {
+    /*
+     * 1. Create the PTFDesc from the Qspec attached to this QB.
+     */
+    RowResolver rr = opParseCtx.get(input).getRowResolver();
+    PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+    /*
+     * Build an RR for the Extract Op from the ResuceSink Op's RR.
+     * Why?
+     * We need to remove the Virtual Columns present in the RS's RR. The OI
+     * that gets passed to Extract at runtime doesn't contain the Virtual Columns.
+     * So internal names get changed. Consider testCase testJoinWithLeadLag,
+     * which is a self join on part and also has a Windowing expression.
+     * The RR of the RS op at transaltion time looks something like this:
+     * (_co1,_col2,..,_col7, _col8(vc=true),_col9(vc=true),
+     * _col10,_col11,.._col15(vc=true),_col16(vc=true),..)
+     * At runtime the Virtual columns are removed and all the columns after _col7
+     * are shifted 1 or 2 positions.
+     * So in child Operators ColumnExprNodeDesc's are no longer referring to the right columns.
+     *
+     * So we build a new RR for the Extract Op, with the Virtual Columns removed.
+     * We hand this to the PTFTranslator as the
+     * starting RR to use to translate a PTF Chain.
+     */
+    RowResolver extractOpRR = new RowResolver();
+
+    /*
+     * 2. build Map-side Op Graph. Graph template is either:
+     * Input -> PTF_map -> ReduceSink
+     * or
+     * Input -> ReduceSink
+     *
+     * Here the ExprNodeDescriptors in the QueryDef are based on the Input Operator's RR.
+     */
+    {
+      PartitionedTableFunctionDef tabDef = ptfDesc.getStartOfChain();
+
+      /*
+       * a. add Map-side PTF Operator if needed
+       */
+      if (tabDef.isTransformsRawInput() )
+      {
+        RowResolver ptfMapRR = tabDef.getRawInputShape().getRr();
+
+        input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+            new RowSchema(ptfMapRR.getColumnInfos()),
+            input), ptfMapRR);
+        rr = opParseCtx.get(input).getRowResolver();
+      }
+
+      /*
+       * b. Build Reduce Sink Details (keyCols, valueCols, outColNames etc.) for this ptfDesc.
+       */
+
+      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+      Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+      List<String> outputColumnNames = new ArrayList<String>();
+      StringBuilder orderString = new StringBuilder();
+
+      /*
+       * Use the input RR of TableScanOperator in case there is no map-side
+       * reshape of input.
+       * If the parent of ReduceSinkOperator is PTFOperator, use it's
+       * output RR.
+       */
+      buildPTFReduceSinkDetails(tabDef,
+          rr,
+          partCols,
+          valueCols,
+          orderCols,
+          colExprMap,
+          outputColumnNames,
+          orderString,
+          extractOpRR);
+
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+          .getReduceSinkDesc(orderCols,
+              valueCols, outputColumnNames, false,
+              -1, partCols, orderString.toString(), -1),
+          new RowSchema(rr.getColumnInfos()), input), rr);
+      input.setColumnExprMap(colExprMap);
+    }
+
+    /*
+     * 3. build Reduce-side Op Graph
+     */
+    {
+      /*
+       * b. Construct Extract Operator.
+       */
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+          new ExtractDesc(
+              new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+                  Utilities.ReduceField.VALUE
+                  .toString(), "", false)),
+          new RowSchema(extractOpRR.getColumnInfos()),
+          input), extractOpRR);
+
+      /*
+       * c. Rebuilt the QueryDef.
+       * Why?
+       * - so that the ExprNodeDescriptors in the QueryDef are based on the
+       *   Extract Operator's RowResolver
+       */
+      rr = opParseCtx.get(input).getRowResolver();
+      ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+      /*
+       * d. Construct PTF Operator.
+       */
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+          new RowSchema(ptfOpRR.getColumnInfos()),
+          input), ptfOpRR);
+
+    }
+
+    return input;
+
+  }
+
+//--------------------------- Windowing handling: PTFInvocationSpec to PTFDesc --------------------
+
+  Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException {
+    wSpec.validateAndMakeEffective();
+    WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+    RowResolver rr = opParseCtx.get(input).getRowResolver();
+
+    while(groups.hasNext() ) {
+      wSpec = groups.next(conf, this, unparseTranslator, rr);
+      input = genReduceSinkPlanForWindowing(wSpec, rr, input);
+      rr = opParseCtx.get(input).getRowResolver();
+      PTFTranslator translator = new PTFTranslator();
+      PTFDesc ptfDesc = translator.translate(wSpec, this, conf, rr, unparseTranslator);
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+          new RowSchema(ptfOpRR.getColumnInfos()),
+          input), ptfOpRR);
+      rr = ptfOpRR;
+    }
+
+    return input;
+  }
+
+  private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
+      RowResolver inputRR,
+      Operator input) throws SemanticException{
+    ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    List<String> outputColumnNames = new ArrayList<String>();
+    StringBuilder orderString = new StringBuilder();
+
+    ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
+    for (PartitionExpression partCol : partColList) {
+      ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
+      partCols.add(partExpr);
+      orderCols.add(partExpr);
+      orderString.append('+');
+    }
+
+    ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
+        new ArrayList<PTFInvocationSpec.OrderExpression>() :
+          spec.getQueryOrderSpec().getExpressions();
+    for (int i = 0; i < orderColList.size(); i++) {
+      OrderExpression orderCol = orderColList.get(i);
+      org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
+      if (order.name().equals("ASC")) {
+        orderString.append('+');
+      } else {
+        orderString.append('-');
+      }
+      ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
+      orderCols.add(orderExpr);
+    }
+
+    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+    RowResolver rsNewRR = new RowResolver();
+    int pos = 0;
+    for (ColumnInfo colInfo : colInfoList) {
+        ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+            .getInternalName(), colInfo.getTabAlias(), colInfo
+            .getIsVirtualCol());
+        valueCols.add(valueColExpr);
+        colExprMap.put(colInfo.getInternalName(), valueColExpr);
+        String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+        outputColumnNames.add(outColName);
+
+        String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+        ColumnInfo newColInfo = new ColumnInfo(
+            outColName, colInfo.getType(), alias[0],
+            colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+        rsNewRR.put(alias[0], alias[1], newColInfo);
+
+    }
+
+    input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+        .getReduceSinkDesc(orderCols,
+            valueCols, outputColumnNames, false,
+            -1, partCols, orderString.toString(), -1),
+        new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+    input.setColumnExprMap(colExprMap);
+
+
+ // Construct the RR for extract operator
+    RowResolver extractRR = new RowResolver();
+    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+        new LinkedHashMap<String[], ColumnInfo>();
+    pos = 0;
+
+    for (ColumnInfo colInfo : colInfoList) {
+      String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+      /*
+       * if we have already encountered this colInfo internalName.
+       * We encounter it again because it must be put for the Having clause.
+       * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+       */
+      if ( colsAddedByHaving.containsKey(alias)) {
+        continue;
+      }
+      ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+      ColumnInfo eColInfo = new ColumnInfo(
+          SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+          colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+      if ( astNode == null ) {
+        extractRR.put(alias[0], alias[1], eColInfo);
+      }
+      else {
+        /*
+         * in case having clause refers to this column may have been added twice;
+         * once with the ASTNode.toStringTree as the alias
+         * and then with the real alias.
+         */
+        extractRR.putExpression(astNode, eColInfo);
+        if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+          colsAddedByHaving.put(alias, eColInfo);
+        }
+      }
+    }
+
+    for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+      String[] alias = columnAddedByHaving.getKey();
+      ColumnInfo eColInfo = columnAddedByHaving.getValue();
+      extractRR.put(alias[0], alias[1], eColInfo);
+    }
+
+    /*
+     * b. Construct Extract Operator.
+     */
+    input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new ExtractDesc(
+            new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+                Utilities.ReduceField.VALUE
+                .toString(), "", false)),
+        new RowSchema(inputRR.getColumnInfos()),
+        input), extractRR);
+
+
+    return input;
+  }
+
+
+  public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
+      throws SemanticException
+  {
+    ASTNode selNode = null;
+    try {
+      ParseDriver pd = new ParseDriver();
+      selNode = pd.parseSelect(selectExprStr, null);
+    } catch (ParseException pe) {
+      throw new SemanticException(pe);
+    }
+
+    ArrayList<WindowExpressionSpec> selSpec = new ArrayList<WindowExpressionSpec>();
+    int childCount = selNode.getChildCount();
+    for (int i = 0; i < childCount; i++) {
+      ASTNode selExpr = (ASTNode) selNode.getChild(i);
+      if (selExpr.getType() != HiveParser.TOK_SELEXPR) {
+        throw new SemanticException(String.format(
+            "Only Select expressions supported in dynamic select list: %s", selectExprStr));
+      }
+      ASTNode expr = (ASTNode) selExpr.getChild(0);
+      if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
+        throw new SemanticException(
+            String.format("'%s' column not allowed in dynamic select list", selectExprStr));
+      }
+      ASTNode aliasNode = selExpr.getChildCount() > 1
+          && selExpr.getChild(1).getType() == HiveParser.Identifier ?
+          (ASTNode) selExpr.getChild(1) : null;
+      String alias = null;
+      if ( aliasNode != null ) {
+        alias = aliasNode.getText();
+      }
+      else {
+        String[] tabColAlias = getColAlias(selExpr, null, null, true, -1);
+        alias = tabColAlias[1];
+      }
+      WindowExpressionSpec exprSpec = new WindowExpressionSpec();
+      exprSpec.setAlias(alias);
+      exprSpec.setExpression(expr);
+      selSpec.add(exprSpec);
+    }
+
+    return selSpec;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java?rev=1463556&r1=1463555&r2=1463556&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java Tue Apr  2 14:16:34 2013
@@ -492,6 +492,7 @@ public final class TypeCheckProcFactory 
     static HashMap<Integer, String> specialUnaryOperatorTextHashMap;
     static HashMap<Integer, String> specialFunctionTextHashMap;
     static HashMap<Integer, String> conversionFunctionTextHashMap;
+    static HashSet<Integer> windowingTokens;
     static {
       specialUnaryOperatorTextHashMap = new HashMap<Integer, String>();
       specialUnaryOperatorTextHashMap.put(HiveParser.PLUS, "positive");
@@ -522,6 +523,22 @@ public final class TypeCheckProcFactory 
           serdeConstants.TIMESTAMP_TYPE_NAME);
       conversionFunctionTextHashMap.put(HiveParser.TOK_DECIMAL,
           serdeConstants.DECIMAL_TYPE_NAME);
+
+      windowingTokens = new HashSet<Integer>();
+      windowingTokens.add(HiveParser.KW_OVER);
+      windowingTokens.add(HiveParser.TOK_PARTITIONINGSPEC);
+      windowingTokens.add(HiveParser.TOK_DISTRIBUTEBY);
+      windowingTokens.add(HiveParser.TOK_SORTBY);
+      windowingTokens.add(HiveParser.TOK_CLUSTERBY);
+      windowingTokens.add(HiveParser.TOK_WINDOWSPEC);
+      windowingTokens.add(HiveParser.TOK_WINDOWRANGE);
+      windowingTokens.add(HiveParser.TOK_WINDOWVALUES);
+      windowingTokens.add(HiveParser.KW_UNBOUNDED);
+      windowingTokens.add(HiveParser.KW_PRECEDING);
+      windowingTokens.add(HiveParser.KW_FOLLOWING);
+      windowingTokens.add(HiveParser.KW_CURRENT);
+      windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEASC);
+      windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEDESC);
     }
 
     private static boolean isRedundantConversionFunction(ASTNode expr,
@@ -865,6 +882,20 @@ public final class TypeCheckProcFactory 
 
       ASTNode expr = (ASTNode) nd;
 
+      /*
+       * A Windowing specification get added as a child to a UDAF invocation to distinguish it
+       * from similar UDAFs but on different windows.
+       * The UDAF is translated to a WindowFunction invocation in the PTFTranslator.
+       * So here we just return null for tokens that appear in a Window Specification.
+       * When the traversal reaches up to the UDAF invocation its ExprNodeDesc is build using the
+       * ColumnInfo in the InputRR. This is similar to how UDAFs are handled in Select lists.
+       * The difference is that there is translation for Window related tokens, so we just
+       * return null;
+       */
+      if ( windowingTokens.contains(expr.getType())) {
+        return null;
+      }
+
       if (expr.getType() == HiveParser.TOK_TABNAME) {
         return null;
       }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+
+/*
+ * breakup the original WindowingSpec into a set of WindowingSpecs.
+ * Each WindowingSpec is executed in an instance of PTFOperator,
+ * preceded by ReduceSink and Extract.
+ * The logic to componentize is straightforward:
+ * - distribute Window Fn. Specs from original Window Spec into a set of WindowSpecs,
+ *   based on their Partitioning.
+ * - A Group of WindowSpecs, is a subset of the Window Fn Invocations in the QueryBlock that
+ *   have the same Partitioning(Partition + Order spec).
+ * - Each Group is put in a new WindowingSpec and is evaluated in its own PTFOperator instance.
+ * - the order of computation is then inferred based on the dependency between Groups.
+ *   If 2 groups have the same dependency, then the Group with the function that is
+ *   earliest in the SelectList is executed first.
+ */
+public class WindowingComponentizer {
+
+  WindowingSpec originalSpec;
+  LinkedHashMap<PartitioningSpec, WindowingSpec> groups;
+
+  public WindowingComponentizer(WindowingSpec originalSpec) throws SemanticException {
+    super();
+    this.originalSpec = originalSpec;
+    groups = new LinkedHashMap<PartitioningSpec, WindowingSpec>();
+    groupFunctions();
+  }
+
+  private void groupFunctions() throws SemanticException {
+    for (WindowExpressionSpec expr : originalSpec.getWindowExpressions()) {
+      WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
+      PartitioningSpec wFnGrp = wFn.getWindowSpec().getPartitioning();
+      WindowingSpec wSpec = groups.get(wFnGrp);
+      if (wSpec == null) {
+        wSpec = new WindowingSpec();
+        groups.put(wFnGrp, wSpec);
+      }
+      wSpec.addWindowFunction(wFn);
+    }
+  }
+
+  public boolean hasNext() {
+    return !groups.isEmpty();
+  }
+
+  public WindowingSpec next(HiveConf hCfg,
+      SemanticAnalyzer semAly,
+      UnparseTranslator unparseT,
+      RowResolver inputRR) throws SemanticException {
+
+    SemanticException originalException = null;
+
+    Iterator<Map.Entry<PartitioningSpec, WindowingSpec>> grpIt = groups.entrySet().iterator();
+    while (grpIt.hasNext()) {
+      Map.Entry<PartitioningSpec, WindowingSpec> entry = grpIt.next();
+      WindowingSpec wSpec = entry.getValue();
+      try {
+        PTFTranslator t = new PTFTranslator();
+        t.translate(wSpec, semAly, hCfg, inputRR, unparseT);
+        groups.remove(entry.getKey());
+        return wSpec;
+      } catch (SemanticException se) {
+        originalException = se;
+      }
+    }
+
+    throw new SemanticException("Failed to breakup Windowing invocations into Groups. " +
+        "At least 1 group must only depend on input columns. " +
+        "Also check for circular dependencies.\n" +
+        "Underlying error: " + originalException.getMessage());
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingExprNodeEvaluatorFactory.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.PTFTranslator.LeadLagInfo;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLag;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag.GenericUDFLead;
+
+/*
+ * When constructing the Evaluator Tree from an ExprNode Tree
+ * - look for any descendant LeadLag Function Expressions
+ * - if they are found:
+ *   - add them to the LLInfo.leadLagExprs and
+ *   - add a mapping from the Expr Tree root to the LLFunc Expr in LLInfo.mapTopExprToLLFunExprs
+ */
+public class WindowingExprNodeEvaluatorFactory {
+
+  public static ExprNodeEvaluator get(LeadLagInfo llInfo, ExprNodeDesc desc) throws HiveException
+  {
+    FindLeadLagFuncExprs visitor = new FindLeadLagFuncExprs(llInfo, desc);
+    new ExprNodeWalker(visitor).walk(desc);
+    return ExprNodeEvaluatorFactory.get(desc);
+  }
+
+  public static class FindLeadLagFuncExprs
+  {
+    ExprNodeDesc topExpr;
+    LeadLagInfo llInfo;
+
+    FindLeadLagFuncExprs(LeadLagInfo llInfo, ExprNodeDesc topExpr)
+    {
+      this.llInfo = llInfo;
+      this.topExpr = topExpr;
+    }
+
+    public void visit(ExprNodeGenericFuncDesc fnExpr) throws HiveException
+    {
+      GenericUDF fn = fnExpr.getGenericUDF();
+      if (fn instanceof GenericUDFLead || fn instanceof GenericUDFLag )
+      {
+        llInfo.addLLFuncExprForTopExpr(topExpr, fnExpr);
+      }
+    }
+  }
+
+  static class ExprNodeWalker
+  {
+    FindLeadLagFuncExprs visitor;
+
+    public ExprNodeWalker(FindLeadLagFuncExprs visitor)
+    {
+      super();
+      this.visitor = visitor;
+    }
+
+    public void walk(ExprNodeDesc e) throws HiveException
+    {
+      if ( e == null ) {
+        return;
+      }
+      List<ExprNodeDesc>  children = e.getChildren();
+      if ( children != null )
+      {
+        for(ExprNodeDesc child : children)
+        {
+          walk(child);
+        }
+      }
+
+      if ( e instanceof ExprNodeGenericFuncDesc)
+      {
+        visitor.visit((ExprNodeGenericFuncDesc)e);
+      }
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1463556&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Tue Apr  2 14:16:34 2013
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.antlr.runtime.CommonToken;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+
+/*
+ * Captures the Window processing specified in a Query. A Query may
+ * contain:
+ * - UDAF invocations on a Window.
+ * - Lead/Lag function invocations that can only be evaluated in a
+ *   Partition.
+ * - For Queries that don't have a Group By all UDAF invocations are
+ *   treated as Window Function invocations.
+ * - For Queries that don't have a Group By, the Having condition is
+ *   handled as a post processing on the rows output by Windowing
+ *   processing.
+ * Windowing is a container of all the Select Expressions that are
+ * to be handled by Windowing. These are held in 2 lists: the functions
+ * list holds WindowFunction invocations; the expressions list holds
+ * Select Expressions having Lead/Lag function calls. It may also
+ * contain an ASTNode representing the post filter to apply on the
+ * output of Window Functions.
+ * Windowing also contains all the Windows defined in the Query. One of
+ * the Windows is designated as the 'default' Window. If the Query has a
+ * Distribute By/Cluster By clause; then the information in these
+ * clauses is captured as a Partitioning and used as the default Window
+ * for the Query. Otherwise the first Window specified is treated as the
+ * default.
+ * Finally Windowing maintains a Map from an 'alias' to the ASTNode that
+ * represents the Select Expression that was translated to a Window
+ * Function invocation or a Window Expression. This is used when
+ * building RowResolvers.
+ */
+public class WindowingSpec {
+  HashMap<String, WindowExpressionSpec> aliasToWdwExpr;
+  HashMap<String, WindowSpec> windowSpecs;
+  ArrayList<WindowExpressionSpec> windowExpressions;
+
+  public void addWindowSpec(String name, WindowSpec wdwSpec) {
+    windowSpecs = windowSpecs == null ? new HashMap<String, WindowSpec>() : windowSpecs;
+    windowSpecs.put(name, wdwSpec);
+  }
+
+  public void addExpression(ASTNode expr, String alias) {
+    windowExpressions = windowExpressions == null ?
+        new ArrayList<WindowExpressionSpec>() : windowExpressions;
+    aliasToWdwExpr = aliasToWdwExpr == null ?
+        new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
+    WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
+    wExprSpec.setAlias(alias);
+    wExprSpec.setExpression(expr);
+
+    windowExpressions.add(wExprSpec);
+    aliasToWdwExpr.put(alias, wExprSpec);
+  }
+
+  public void addWindowFunction(WindowFunctionSpec wFn) {
+    windowExpressions = windowExpressions == null ?
+        new ArrayList<WindowExpressionSpec>() : windowExpressions;
+    aliasToWdwExpr = aliasToWdwExpr == null ?
+        new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
+    windowExpressions.add(wFn);
+    aliasToWdwExpr.put(wFn.getAlias(), wFn);
+  }
+
+  public HashMap<String, WindowExpressionSpec> getAliasToWdwExpr() {
+    return aliasToWdwExpr;
+  }
+
+  public void setAliasToWdwExpr(HashMap<String, WindowExpressionSpec> aliasToWdwExpr) {
+    this.aliasToWdwExpr = aliasToWdwExpr;
+  }
+
+  public HashMap<String, WindowSpec> getWindowSpecs() {
+    return windowSpecs;
+  }
+
+  public void setWindowSpecs(HashMap<String, WindowSpec> windowSpecs) {
+    this.windowSpecs = windowSpecs;
+  }
+
+  public ArrayList<WindowExpressionSpec> getWindowExpressions() {
+    return windowExpressions;
+  }
+
+  public void setWindowExpressions(ArrayList<WindowExpressionSpec> windowExpressions) {
+    this.windowExpressions = windowExpressions;
+  }
+
+  public PartitioningSpec getQueryPartitioningSpec() {
+    /*
+     * Why no null and class checks?
+     * With the new design a WindowingSpec must contain a WindowFunctionSpec.
+     * todo: cleanup datastructs.
+     */
+    WindowFunctionSpec wFn = (WindowFunctionSpec) getWindowExpressions().get(0);
+    return wFn.getWindowSpec().getPartitioning();
+  }
+
+  public PartitionSpec getQueryPartitionSpec() {
+    return getQueryPartitioningSpec().getPartSpec();
+  }
+
+  public OrderSpec getQueryOrderSpec() {
+    return getQueryPartitioningSpec().getOrderSpec();
+  }
+
+  /*
+   * Apply the rules in the Spec. to fill in any missing pieces of every Window Specification,
+   * also validate that the effective Specification is valid. The rules applied are:
+   * - For Wdw Specs that refer to Window Defns, inherit missing components.
+   * - A Window Spec with no Parition Spec, is Partitioned on a Constant(number 0)
+   * - For missing Wdw Frames or for Frames with only a Start Boundary, completely specify them
+   *   by the rules in {@link effectiveWindowFrame}
+   * - Validate the effective Window Frames with the rules in {@link validateWindowFrame}
+   * - If there is no Order, then add the Partition expressions as the Order.
+   */
+  public void validateAndMakeEffective() throws SemanticException {
+    for(WindowExpressionSpec expr : getWindowExpressions()) {
+      WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
+      WindowSpec wdwSpec = wFn.getWindowSpec();
+
+      // 1. For Wdw Specs that refer to Window Defns, inherit missing components
+      if ( wdwSpec != null ) {
+        ArrayList<String> sources = new ArrayList<String>();
+        fillInWindowSpec(wdwSpec.getSourceId(), wdwSpec, sources);
+      }
+
+      if ( wdwSpec == null ) {
+        wdwSpec = new WindowSpec();
+        wFn.setWindowSpec(wdwSpec);
+      }
+
+      // 2. A Window Spec with no Parition Spec, is Partitioned on a Constant(number 0)
+      applyContantPartition(wdwSpec);
+
+      // 3. For missing Wdw Frames or for Frames with only a Start Boundary, completely
+      //    specify them by the rules in {@link effectiveWindowFrame}
+      effectiveWindowFrame(wFn, wdwSpec);
+
+      // 4. Validate the effective Window Frames with the rules in {@link validateWindowFrame}
+      validateWindowFrame(wdwSpec);
+
+      // 5. If there is no Order, then add the Partition expressions as the Order.
+      wdwSpec.ensureOrderSpec();
+    }
+  }
+
+  private void fillInWindowSpec(String sourceId, WindowSpec dest, ArrayList<String> visited)
+      throws SemanticException
+  {
+    if (sourceId != null)
+    {
+      if ( visited.contains(sourceId)) {
+        visited.add(sourceId);
+        throw new SemanticException(String.format("Cycle in Window references %s", visited));
+      }
+      WindowSpec source = getWindowSpecs().get(sourceId);
+      if (source == null || source.equals(dest))
+      {
+        throw new SemanticException(String.format("Window Spec %s refers to an unknown source " ,
+            dest));
+      }
+
+      if (dest.getPartition() == null)
+      {
+        dest.setPartition(source.getPartition());
+      }
+
+      if (dest.getOrder() == null)
+      {
+        dest.setOrder(source.getOrder());
+      }
+
+      if (dest.getWindowFrame() == null)
+      {
+        dest.setWindowFrame(source.getWindowFrame());
+      }
+
+      visited.add(sourceId);
+
+      fillInWindowSpec(source.getSourceId(), dest, visited);
+    }
+  }
+
+  private void applyContantPartition(WindowSpec wdwSpec) {
+    PartitionSpec partSpec = wdwSpec.getPartition();
+    if ( partSpec == null ) {
+      partSpec = new PartitionSpec();
+      PartitionExpression partExpr = new PartitionExpression();
+      partExpr.setExpression(new ASTNode(new CommonToken(HiveParser.Number, "0")));
+      partSpec.addExpression(partExpr);
+      wdwSpec.setPartition(partSpec);
+    }
+  }
+
+  /*
+   * - A Window Frame that has only the /start/boundary, then it is interpreted as:
+         BETWEEN <start boundary> AND CURRENT ROW
+   * - A Window Specification with an Order Specification and no Window
+   *   Frame is interpreted as:
+         ROW BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+   * - A Window Specification with no Order and no Window Frame is interpreted as:
+         ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+   */
+  private void effectiveWindowFrame(WindowFunctionSpec wFn, WindowSpec wdwSpec) {
+
+    WindowFunctionInfo wFnInfo = FunctionRegistry.getWindowFunctionInfo(wFn.getName());
+    boolean supportsWindowing = wFnInfo == null ? true : wFnInfo.isSupportsWindow();
+    WindowFrameSpec wFrame = wdwSpec.getWindowFrame();
+    OrderSpec orderSpec = wdwSpec.getOrder();
+    if ( wFrame == null ) {
+      if (!supportsWindowing ) {
+        wFrame = new WindowFrameSpec(
+            new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+            new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+            );
+      }
+      else if ( orderSpec == null ) {
+        wFrame = new WindowFrameSpec(
+            new RangeBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+            new RangeBoundarySpec(Direction.FOLLOWING, BoundarySpec.UNBOUNDED_AMOUNT)
+            );
+      }
+      else {
+        wFrame = new WindowFrameSpec(
+            new ValueBoundarySpec(Direction.PRECEDING, BoundarySpec.UNBOUNDED_AMOUNT),
+            new CurrentRowSpec()
+            );
+      }
+      wdwSpec.setWindowFrame(wFrame);
+    }
+    else if ( wFrame.getEnd() == null ) {
+      wFrame.setEnd(new CurrentRowSpec());
+    }
+  }
+
+  private void validateWindowFrame(WindowSpec wdwSpec) throws SemanticException {
+    WindowFrameSpec wFrame = wdwSpec.getWindowFrame();
+    BoundarySpec start = wFrame.getStart();
+    BoundarySpec end = wFrame.getEnd();
+
+    if ( start.getDirection() == Direction.FOLLOWING &&
+        start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) {
+      throw new SemanticException("Start of a WindowFrame cannot be UNBOUNDED FOLLOWING");
+    }
+
+    if ( end.getDirection() == Direction.PRECEDING &&
+        start.getAmt() == BoundarySpec.UNBOUNDED_AMOUNT ) {
+      throw new SemanticException("End of a WindowFrame cannot be UNBOUNDED PRECEDING");
+    }
+
+    validateValueBoundary(wFrame.getStart(), wdwSpec.getOrder());
+    validateValueBoundary(wFrame.getEnd(), wdwSpec.getOrder());
+  }
+
+  private void validateValueBoundary(BoundarySpec bs, OrderSpec order) throws SemanticException {
+    if ( bs instanceof ValueBoundarySpec ) {
+      ValueBoundarySpec vbs = (ValueBoundarySpec) bs;
+      if ( order != null ) {
+        if ( order.getExpressions().size() > 1 ) {
+          throw new SemanticException("Range based Window Frame can have only 1 Sort Key");
+        }
+      }
+      vbs.setExpression(order.getExpressions().get(0).getExpression());
+    }
+  }
+
+  /*
+   * Represents a Select Expression in the context of Windowing. These can
+   * refer to the output of Windowing Functions and can navigate the
+   * Partition using Lead/Lag functions.
+   */
+  public static class WindowExpressionSpec {
+    String alias;
+    ASTNode expression;
+    public String getAlias() {
+      return alias;
+    }
+    public void setAlias(String alias) {
+      this.alias = alias;
+    }
+    public ASTNode getExpression() {
+      return expression;
+    }
+    public void setExpression(ASTNode expression) {
+      this.expression = expression;
+    }
+  }
+
+  /*
+   * Represents a UDAF invocation in the context of a Window Frame. As
+   * explained above sometimes UDAFs will be handled as Window Functions
+   * even w/o an explicit Window specification. This is to support Queries
+   * that have no Group By clause. A Window Function invocation captures:
+   * - the ASTNode that represents this invocation
+   * - its name
+   * - whether it is star/distinct invocation.
+   * - its alias
+   * - and an optional Window specification
+   */
+  public static class WindowFunctionSpec extends WindowExpressionSpec
+  {
+    String name;
+    boolean isStar;
+    boolean isDistinct;
+    ArrayList<ASTNode> args;
+    WindowSpec windowSpec;
+
+    public String getName() {
+      return name;
+    }
+    public void setName(String name) {
+      this.name = name;
+    }
+    public boolean isStar() {
+      return isStar;
+    }
+    public void setStar(boolean isStar) {
+      this.isStar = isStar;
+    }
+    public boolean isDistinct() {
+      return isDistinct;
+    }
+    public void setDistinct(boolean isDistinct) {
+      this.isDistinct = isDistinct;
+    }
+    public ArrayList<ASTNode> getArgs() {
+      args = args == null ? new ArrayList<ASTNode>() : args;
+      return args;
+    }
+    public void setArgs(ArrayList<ASTNode> args) {
+      this.args = args;
+    }
+    public void addArg(ASTNode arg) {
+      args = args == null ? new ArrayList<ASTNode>() : args;
+      args.add((ASTNode)arg);
+    }
+    public WindowSpec getWindowSpec() {
+      return windowSpec;
+    }
+    public void setWindowSpec(WindowSpec windowSpec) {
+      this.windowSpec = windowSpec;
+    }
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append(name).append("(");
+      if (isStar )
+      {
+        buf.append("*");
+      }
+      else
+      {
+        if ( isDistinct )
+        {
+          buf.append("distinct ");
+        }
+        if ( args != null )
+        {
+          boolean first = true;
+          for(ASTNode arg : args)
+          {
+            if ( first) {
+              first = false;
+            } else {
+              buf.append(", ");
+            }
+            buf.append(arg.toStringTree());
+          }
+        }
+      }
+
+      buf.append(")");
+
+      if ( windowSpec != null )
+      {
+        buf.append(" ").append(windowSpec.toString());
+      }
+
+      if ( alias != null )
+      {
+        buf.append(" as ").append(alias);
+      }
+
+      return buf.toString();
+    }
+
+  }
+
+  /*
+   * It represents a WindowFrame applied to a Partitioning. A Window can
+   * refer to a <i>source</i> Window by name. The source Window provides the
+   * basis for this Window definition. This Window specification
+   * extends/overrides the <i>source</i> Window definition. In our e.g. the
+   * Select Expression $sum(p_retailprice) over (w1)$ is translated into a
+   * WindowFunction instance that has a Window specification that refers
+   * to the global Window Specification 'w1'. The Function's specification
+   * has no content, but inherits all its attributes from 'w1' during
+   * subsequent phases of translation.
+   */
+  public static class WindowSpec
+  {
+    String sourceId;
+    PartitioningSpec partitioning;
+    WindowFrameSpec windowFrame;
+    public String getSourceId() {
+      return sourceId;
+    }
+    public void setSourceId(String sourceId) {
+      this.sourceId = sourceId;
+    }
+    public PartitioningSpec getPartitioning() {
+      return partitioning;
+    }
+    public void setPartitioning(PartitioningSpec partitioning) {
+      this.partitioning = partitioning;
+    }
+    public WindowFrameSpec getWindowFrame() {
+      return windowFrame;
+    }
+    public void setWindowFrame(WindowFrameSpec windowFrame) {
+      this.windowFrame = windowFrame;
+    }
+    public PartitionSpec getPartition() {
+      return getPartitioning() == null ? null : getPartitioning().getPartSpec();
+    }
+    public void setPartition(PartitionSpec partSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+      partitioning.setPartSpec(partSpec);
+    }
+    public OrderSpec getOrder() {
+      return getPartitioning() == null ? null : getPartitioning().getOrderSpec();
+    }
+    public void setOrder(OrderSpec orderSpec) {
+      partitioning = partitioning == null ? new PartitioningSpec() : partitioning;
+      partitioning.setOrderSpec(orderSpec);
+    }
+    /*
+     * When there is no Order specified, we add the Partition expressions as
+     * Order expressions. This is an implementation artifact. For UDAFS that
+     * imply order (like rank, dense_rank) depend on the Order Expressions to
+     * work. Internally we pass the Order Expressions as Args to these functions.
+     * We could change the translation so that the Functions are setup with
+     * Partition expressions when the OrderSpec is null; but for now we are setting up
+     * an OrderSpec that copies the Partition expressions.
+     */
+    protected void ensureOrderSpec() {
+      if ( getOrder() == null ) {
+        OrderSpec order = new OrderSpec();
+        order.prefixBy(getPartition());
+        setOrder(order);
+      }
+    }
+  };
+
+  /*
+   * A WindowFrame specifies the Range on which a Window Function should
+   * be applied for the 'current' row. Its is specified by a <i>start</i> and
+   * <i>end</i> Boundary.
+   */
+  public static class WindowFrameSpec
+  {
+    BoundarySpec start;
+    BoundarySpec end;
+
+    public WindowFrameSpec() {
+    }
+
+    public WindowFrameSpec(BoundarySpec start, BoundarySpec end)
+    {
+      super();
+      this.start = start;
+      this.end = end;
+    }
+
+    public WindowFrameSpec(BoundarySpec start)
+    {
+      this(start, null);
+    }
+
+    public BoundarySpec getStart()
+    {
+      return start;
+    }
+
+    public void setStart(BoundarySpec start)
+    {
+      this.start = start;
+    }
+
+    public BoundarySpec getEnd()
+    {
+      return end;
+    }
+
+    public void setEnd(BoundarySpec end)
+    {
+      this.end = end;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("window(start=%s, end=%s)", start, end);
+    }
+
+  }
+
+  public static enum Direction
+  {
+    PRECEDING,
+    CURRENT,
+    FOLLOWING
+  };
+
+  /*
+   * A Boundary specifies how many rows back/forward a WindowFrame extends from the
+   * current row. A Boundary is specified as:
+   * - Range Boundary :: as the number of rows to go forward or back from
+                    the Current Row.
+   * - Current Row :: which implies the Boundary is at the current row.
+   * - Value Boundary :: which is specified as the amount the value of an
+                    Expression must decrease/increase
+   */
+  public abstract static class BoundarySpec implements Comparable<BoundarySpec>
+  {
+    public static int UNBOUNDED_AMOUNT = Integer.MAX_VALUE;
+
+    public abstract Direction getDirection();
+    public abstract void setDirection(Direction dir);
+    public abstract void setAmt(int amt);
+    public abstract int getAmt();
+  }
+
+  public static class RangeBoundarySpec extends BoundarySpec
+  {
+
+    Direction direction;
+    int amt;
+
+    public RangeBoundarySpec() {
+    }
+
+    public RangeBoundarySpec(Direction direction, int amt)
+    {
+      super();
+      this.direction = direction;
+      this.amt = amt;
+    }
+
+    @Override
+    public Direction getDirection()
+    {
+      return direction;
+    }
+
+    @Override
+    public void setDirection(Direction direction)
+    {
+      this.direction = direction;
+    }
+
+    @Override
+    public int getAmt()
+    {
+      return amt;
+    }
+
+    @Override
+    public void setAmt(int amt)
+    {
+      this.amt = amt;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("range(%s %s)", (amt == UNBOUNDED_AMOUNT ? "Unbounded" : amt),
+          direction);
+    }
+
+    public int compareTo(BoundarySpec other)
+    {
+      int c = direction.compareTo(other.getDirection());
+      if (c != 0) {
+        return c;
+      }
+      RangeBoundarySpec rb = (RangeBoundarySpec) other;
+      return amt - rb.amt;
+    }
+
+  }
+
+  public static class CurrentRowSpec extends BoundarySpec
+  {
+    public CurrentRowSpec() {
+    }
+
+    @Override
+    public String toString()
+    {
+      return "currentRow";
+    }
+
+    @Override
+    public Direction getDirection() {
+      return Direction.CURRENT;
+    }
+
+    @Override
+    public void setDirection(Direction dir) {}
+    @Override
+    public void setAmt(int amt) {}
+
+    public int compareTo(BoundarySpec other)
+    {
+      return getDirection().compareTo(other.getDirection());
+    }
+
+    @Override
+    public int getAmt() {return 0;}
+  }
+
+  public static class ValueBoundarySpec extends BoundarySpec
+  {
+    Direction direction;
+    ASTNode expression;
+    int amt;
+
+    public ValueBoundarySpec() {
+    }
+
+    public ValueBoundarySpec(Direction direction, int amt)
+    {
+      super();
+      this.direction = direction;
+      this.amt = amt;
+    }
+
+    @Override
+    public Direction getDirection()
+    {
+      return direction;
+    }
+
+    @Override
+    public void setDirection(Direction direction)
+    {
+      this.direction = direction;
+    }
+
+    public ASTNode getExpression()
+    {
+      return expression;
+    }
+
+    public void setExpression(ASTNode expression)
+    {
+      this.expression = expression;
+    }
+
+    @Override
+    public int getAmt()
+    {
+      return amt;
+    }
+
+    @Override
+    public void setAmt(int amt)
+    {
+      this.amt = amt;
+    }
+
+    @Override
+    public String toString()
+    {
+      return String.format("value(%s %s %s)", expression.toStringTree(), amt, direction);
+    }
+
+    public int compareTo(BoundarySpec other)
+    {
+      int c = direction.compareTo(other.getDirection());
+      if (c != 0) {
+        return c;
+      }
+      ValueBoundarySpec vb = (ValueBoundarySpec) other;
+      return amt - vb.amt;
+    }
+
+  }
+
+}