You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/05 12:34:11 UTC

svn commit: r1464915 [3/5] - in /hive/branches/HIVE-4115: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ hcatalog/build-support/ant/ hcatalog/historical/branches/ hcatalog/historical/sit...

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Apr  5 10:34:08 2013
@@ -35,6 +35,8 @@ import java.util.regex.PatternSyntaxExce
 
 import org.antlr.runtime.tree.BaseTree;
 import org.antlr.runtime.tree.Tree;
+import org.antlr.runtime.tree.TreeWizard;
+import org.antlr.runtime.tree.TreeWizard.ContextVisitor;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
@@ -80,6 +82,7 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
@@ -117,6 +120,24 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcContext;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec.SpecType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.OrderSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PTFQueryInputType;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionExpression;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitionedTableFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.CurrentRowSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.RangeBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.ValueBoundarySpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFrameSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec;
+import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowSpec;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -152,6 +173,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.OrderExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
+import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -198,6 +223,8 @@ public class SemanticAnalyzer extends Ba
   private Map<JoinOperator, QBJoinTree> joinContext;
   private Map<SMBMapJoinOperator, QBJoinTree> smbMapJoinContext;
   private final HashMap<TableScanOperator, Table> topToTable;
+  private final Map<FileSinkOperator, Table> fsopToTable;
+  private final List<ReduceSinkOperator> reduceSinkOperatorsAddedByEnforceBucketingSorting;
   private QB qb;
   private ASTNode ast;
   private int destTableId;
@@ -259,6 +286,8 @@ public class SemanticAnalyzer extends Ba
     joinContext = new HashMap<JoinOperator, QBJoinTree>();
     smbMapJoinContext = new HashMap<SMBMapJoinOperator, QBJoinTree>();
     topToTable = new HashMap<TableScanOperator, Table>();
+    fsopToTable = new HashMap<FileSinkOperator, Table>();
+    reduceSinkOperatorsAddedByEnforceBucketingSorting = new ArrayList<ReduceSinkOperator>();
     destTableId = 1;
     uCtx = null;
     listMapJoinOpsNoReducer = new ArrayList<AbstractMapJoinOperator<? extends MapJoinDesc>>();
@@ -317,11 +346,14 @@ public class SemanticAnalyzer extends Ba
 
   public ParseContext getParseContext() {
     return new ParseContext(conf, qb, ast, opToPartPruner, opToPartList, topOps,
-        topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable, loadTableWork,
+        topSelOps, opParseCtx, joinContext, smbMapJoinContext, topToTable,
+        fsopToTable, loadTableWork,
         loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
-        opToPartToSkewedPruner, viewAliasToInput);
+        opToPartToSkewedPruner, viewAliasToInput,
+        reduceSinkOperatorsAddedByEnforceBucketingSorting,
+        queryProperties);
   }
 
   @SuppressWarnings("nls")
@@ -332,7 +364,9 @@ public class SemanticAnalyzer extends Ba
     switch (ast.getToken().getType()) {
     case HiveParser.TOK_QUERY: {
       QB qb = new QB(id, alias, true);
-      doPhase1(ast, qb, initPhase1Ctx());
+      Phase1Ctx ctx_1 = initPhase1Ctx();
+      doPhase1(ast, qb, ctx_1);
+
       qbexpr.setOpcode(QBExpr.Opcode.NULLOP);
       qbexpr.setQB(qb);
     }
@@ -358,13 +392,28 @@ public class SemanticAnalyzer extends Ba
   }
 
   private LinkedHashMap<String, ASTNode> doPhase1GetAggregationsFromSelect(
-      ASTNode selExpr) {
+      ASTNode selExpr, QB qb, String dest) {
+
     // Iterate over the selects search for aggregation Trees.
     // Use String as keys to eliminate duplicate trees.
     LinkedHashMap<String, ASTNode> aggregationTrees = new LinkedHashMap<String, ASTNode>();
     for (int i = 0; i < selExpr.getChildCount(); ++i) {
-      ASTNode sel = (ASTNode) selExpr.getChild(i).getChild(0);
-      doPhase1GetAllAggregations(sel, aggregationTrees);
+      ASTNode sel = (ASTNode) selExpr.getChild(i);
+      doPhase1GetAllAggregations((ASTNode) sel.getChild(0), aggregationTrees);
+    }
+
+    /*
+     * remove any aggregation to be handled by Windowing.
+     */
+    if ( queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null ) {
+      HashMap<String, ASTNode> aliasToWdwExprs = qb.getParseInfo().getWindowingExprsForClause(dest);
+      LinkedHashMap<String, ASTNode> aggTreesMinusWindowing = new LinkedHashMap<String, ASTNode>();
+      for(Map.Entry<String,ASTNode> entry : aggregationTrees.entrySet()) {
+        if ( !aliasToWdwExprs.containsKey(entry.getKey())) {
+          aggTreesMinusWindowing.put(entry.getKey(), entry.getValue());
+        }
+      }
+      aggregationTrees = aggTreesMinusWindowing;
     }
     return aggregationTrees;
   }
@@ -650,6 +699,17 @@ public class SemanticAnalyzer extends Ba
         processTable(qb, child);
       } else if (child.getToken().getType() == HiveParser.TOK_SUBQUERY) {
         processSubQuery(qb, child);
+      } else if (child.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) {
+        queryProperties.setHasPTF(true);
+        processPTF(qb, child);
+        PTFInvocationSpec ptfInvocationSpec = qb.getPTFInvocationSpec(child);
+        String inputAlias = ptfInvocationSpec == null ? null :
+          ((PartitionedTableFunctionSpec)ptfInvocationSpec.getFunction()).getAlias();;
+        if ( inputAlias == null ) {
+          throw new SemanticException(generateErrorMessage(child,
+              "PTF invocation in a Join must have an alias"));
+        }
+
       } else if (child.getToken().getType() == HiveParser.TOK_LATERAL_VIEW) {
         // SELECT * FROM src1 LATERAL VIEW udtf() AS myTable JOIN src2 ...
         // is not supported. Instead, the lateral view must be in a subquery
@@ -741,11 +801,14 @@ public class SemanticAnalyzer extends Ba
           qbp.setHints((ASTNode) ast.getChild(0));
         }
 
-        LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast);
+        handleWindowingExprsInSelectList(qb, ctx_1.dest, ast);
+
+        LinkedHashMap<String, ASTNode> aggregations = doPhase1GetAggregationsFromSelect(ast,
+            qb, ctx_1.dest);
         doPhase1GetColumnAliasesFromSelect(ast, qbp);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
         qbp.setDistinctFuncExprsForClause(ctx_1.dest,
-            doPhase1GetDistinctFuncExprs(aggregations));
+        doPhase1GetDistinctFuncExprs(aggregations));
         break;
 
       case HiveParser.TOK_WHERE:
@@ -770,7 +833,6 @@ public class SemanticAnalyzer extends Ba
                 .getMsg(ast));
           }
         }
-
         qbp.setDestForClause(ctx_1.dest, (ASTNode) ast.getChild(0));
         break;
 
@@ -793,6 +855,9 @@ public class SemanticAnalyzer extends Ba
           queryProperties.setHasJoin(true);
           processJoin(qb, frm);
           qbp.setJoinExpr(frm);
+        }else if(frm.getToken().getType() == HiveParser.TOK_PTBLFUNCTION){
+          queryProperties.setHasPTF(true);
+          processPTF(qb, frm);
         }
         break;
 
@@ -819,7 +884,7 @@ public class SemanticAnalyzer extends Ba
         break;
 
       case HiveParser.TOK_SORTBY:
-        // Get the sort by aliases - these are aliased to the entries in the
+     // Get the sort by aliases - these are aliased to the entries in the
         // select list
         queryProperties.setHasSortBy(true);
         qbp.setSortByExprForClause(ctx_1.dest, ast);
@@ -873,7 +938,16 @@ public class SemanticAnalyzer extends Ba
 
       case HiveParser.TOK_HAVING:
         qbp.setHavingExprForClause(ctx_1.dest, ast);
-        qbp.addAggregationExprsForClause(ctx_1.dest, doPhase1GetAggregationsFromSelect(ast));
+        qbp.addAggregationExprsForClause(ctx_1.dest,
+            doPhase1GetAggregationsFromSelect(ast, qb, ctx_1.dest));
+        break;
+
+      case HiveParser.KW_WINDOW:
+        if (!qb.hasWindowingSpec(ctx_1.dest) ) {
+          throw new SemanticException(generateErrorMessage(ast,
+              "Query has no Cluster/Distribute By; but has a Window definition"));
+        }
+        handleQueryWindowClauses(qb, ctx_1, ast);
         break;
 
       case HiveParser.TOK_LIMIT:
@@ -2193,12 +2267,20 @@ public class SemanticAnalyzer extends Ba
       List<ASTNode> result = new ArrayList<ASTNode>(selectExprs == null ? 0
           : selectExprs.getChildCount());
       if (selectExprs != null) {
+        HashMap<String, ASTNode> windowingExprs = parseInfo.getWindowingExprsForClause(dest);
+
         for (int i = 0; i < selectExprs.getChildCount(); ++i) {
           if (((ASTNode) selectExprs.getChild(i)).getToken().getType() == HiveParser.TOK_HINTLIST) {
             continue;
           }
           // table.column AS alias
           ASTNode grpbyExpr = (ASTNode) selectExprs.getChild(i).getChild(0);
+          /*
+           * If this is handled by Windowing then ignore it.
+           */
+          if (windowingExprs != null && windowingExprs.containsKey(grpbyExpr.toStringTree())) {
+            continue;
+          }
           result.add(grpbyExpr);
         }
       }
@@ -2225,7 +2307,10 @@ public class SemanticAnalyzer extends Ba
     String tabAlias = null;
     String[] colRef = new String[2];
 
-    if (selExpr.getChildCount() == 2) {
+    //for queries with a windowing expressions, the selexpr may have a third child
+    if (selExpr.getChildCount() == 2 ||
+        (selExpr.getChildCount() == 3 &&
+        selExpr.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC)) {
       // return zz for "xx + yy AS zz"
       colAlias = unescapeIdentifier(selExpr.getChild(1).getText());
       colRef[0] = tabAlias;
@@ -2302,6 +2387,7 @@ public class SemanticAnalyzer extends Ba
     return false;
   }
 
+
   private Operator<?> genSelectPlan(String dest, QB qb, Operator<?> input)
       throws SemanticException {
     ASTNode selExprList = qb.getParseInfo().getSelForClause(dest);
@@ -2439,11 +2525,14 @@ public class SemanticAnalyzer extends Ba
       // child can be EXPR AS ALIAS, or EXPR.
       ASTNode child = (ASTNode) exprList.getChild(i);
       boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
+      boolean isWindowSpec = child.getChildCount() == 3 ?
+          (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
+            false;
 
       // EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
       // This check is not needed and invalid when there is a transform b/c the
       // AST's are slightly different.
-      if (!isInTransform && !isUDTF && child.getChildCount() > 2) {
+      if (!isWindowSpec && !isInTransform && !isUDTF && child.getChildCount() > 2) {
         throw new SemanticException(generateErrorMessage(
             (ASTNode) child.getChild(2),
             ErrorMsg.INVALID_AS.getMsg()));
@@ -5180,6 +5269,7 @@ public class SemanticAnalyzer extends Ba
           + dest_path + " row schema: " + inputRR.toString());
     }
 
+    fsopToTable.put((FileSinkOperator) output, dest_tab);
     return output;
   }
 
@@ -5587,6 +5677,7 @@ public class SemanticAnalyzer extends Ba
             partitionCols, order.toString(), numReducers),
         new RowSchema(inputRR.getColumnInfos()), input), inputRR);
     interim.setColumnExprMap(colExprMap);
+    reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator) interim);
 
     // Add the extract operator to get the value fields
     RowResolver out_rwsch = new RowResolver();
@@ -5609,6 +5700,7 @@ public class SemanticAnalyzer extends Ba
       LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() +
           " row schema: " + out_rwsch.toString());
     }
+
     return output;
 
   }
@@ -6290,12 +6382,18 @@ public class SemanticAnalyzer extends Ba
     ASTNode right = (ASTNode) joinParseTree.getChild(1);
 
     if ((left.getToken().getType() == HiveParser.TOK_TABREF)
-        || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+        || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)
+        || (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
       String tableName = getUnescapedUnqualifiedTableName((ASTNode) left.getChild(0))
           .toLowerCase();
       String alias = left.getChildCount() == 1 ? tableName
           : unescapeIdentifier(left.getChild(left.getChildCount() - 1)
-              .getText().toLowerCase());
+          .getText().toLowerCase());
+      // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+      // guranteed to have an lias here: check done in processJoin
+      alias = (left.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+          unescapeIdentifier(left.getChild(1).getText().toLowerCase()) :
+            alias;
       joinTree.setLeftAlias(alias);
       String[] leftAliases = new String[1];
       leftAliases[0] = alias;
@@ -6321,12 +6419,18 @@ public class SemanticAnalyzer extends Ba
     }
 
     if ((right.getToken().getType() == HiveParser.TOK_TABREF)
-        || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
+        || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)
+        || (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION)) {
       String tableName = getUnescapedUnqualifiedTableName((ASTNode) right.getChild(0))
           .toLowerCase();
       String alias = right.getChildCount() == 1 ? tableName
           : unescapeIdentifier(right.getChild(right.getChildCount() - 1)
-              .getText().toLowerCase());
+          .getText().toLowerCase());
+      // ptf node form is: ^(TOK_PTBLFUNCTION $name $alias? partitionTableFunctionSource partitioningSpec? expression*)
+      // guranteed to have an lias here: check done in processJoin
+      alias = (right.getToken().getType() == HiveParser.TOK_PTBLFUNCTION) ?
+          unescapeIdentifier(right.getChild(1).getText().toLowerCase()) :
+            alias;
       String[] rightAliases = new String[1];
       rightAliases[0] = alias;
       joinTree.setRightAliases(rightAliases);
@@ -6430,8 +6534,10 @@ public class SemanticAnalyzer extends Ba
     joinTree.setStreamAliases(streamAliases);
   }
 
-  private void mergeJoins(QB qb, QBJoinTree parent, QBJoinTree node,
-      QBJoinTree target, int pos) {
+  /**
+   * Merges node to target
+   */
+  private void mergeJoins(QB qb, QBJoinTree node, QBJoinTree target, int pos) {
     String[] nodeRightAliases = node.getRightAliases();
     String[] trgtRightAliases = target.getRightAliases();
     String[] rightAliases = new String[nodeRightAliases.length
@@ -6516,12 +6622,6 @@ public class SemanticAnalyzer extends Ba
       filterPos.addAll(node.getFiltersForPushing().get(0));
     }
 
-    if (qb.getQbJoinTree() == node) {
-      qb.setQbJoinTree(node.getJoinSrc());
-    } else {
-      parent.setJoinSrc(node.getJoinSrc());
-    }
-
     if (node.getNoOuterJoin() && target.getNoOuterJoin()) {
       target.setNoOuterJoin(true);
     } else {
@@ -6606,49 +6706,81 @@ public class SemanticAnalyzer extends Ba
     return res;
   }
 
-  private boolean mergeJoinNodes(QB qb, QBJoinTree parent, QBJoinTree node,
-      QBJoinTree target) {
-    if (target == null) {
-      return false;
+  // try merge join tree from inner most source
+  // (it was merged from outer most to inner, which could be invalid)
+  //
+  // in a join tree ((A-B)-C)-D where C is not mergeable with A-B,
+  // D can be merged with A-B into single join If and only if C and D has same join type
+  // In this case, A-B-D join will be executed first and ABD-C join will be executed in next
+  private void mergeJoinTree(QB qb) {
+    QBJoinTree tree = qb.getQbJoinTree();
+    if (tree.getJoinSrc() == null) {
+      return;
     }
-    if (!node.getNoOuterJoin() || !target.getNoOuterJoin()) {
-      // todo 8 way could be not enough number
-      if (node.getLeftAliases().length + node.getRightAliases().length + 1 >= 32) {
-        LOG.info(ErrorMsg.JOINNODE_OUTERJOIN_MORETHAN_32);
-        return false;
+    // make array with QBJoinTree : outer most(0) --> inner most(n)
+    List<QBJoinTree> trees = new ArrayList<QBJoinTree>();
+    for (;tree != null; tree = tree.getJoinSrc()) {
+      trees.add(tree);
+    }
+    // merging from 'target'(inner) to 'node'(outer)
+    for (int i = trees.size() - 1; i >= 0; i--) {
+      QBJoinTree target = trees.get(i);
+      if (target == null) {
+        continue;
+      }
+      JoinType prevType = null;   // save join type
+      for (int j = i - 1; j >= 0; j--) {
+        QBJoinTree node = trees.get(j);
+        if (node == null) {
+          continue;
+        }
+        JoinType currType = getType(node.getJoinCond());
+        if (prevType != null && prevType != currType) {
+          break;
+        }
+        int pos = findMergePos(node, target);
+        if (pos >= 0) {
+          // for outer joins, it should not exceed 16 aliases (short type)
+          if (!node.getNoOuterJoin() || !target.getNoOuterJoin()) {
+            if (node.getRightAliases().length + target.getRightAliases().length + 1 > 16) {
+              LOG.info(ErrorMsg.JOINNODE_OUTERJOIN_MORETHAN_16);
+              continue;
+            }
+          }
+          mergeJoins(qb, node, target, pos);
+          trees.set(j, null);
+          continue; // continue merging with next alias
+        }
+        if (prevType == null) {
+          prevType = currType;
+        }
       }
     }
-    int res = findMergePos(node, target);
-    if (res != -1) {
-      mergeJoins(qb, parent, node, target, res);
-      return true;
+    // reconstruct join tree
+    QBJoinTree current = null;
+    for (int i = 0; i < trees.size(); i++) {
+      QBJoinTree target = trees.get(i);
+      if (target == null) {
+        continue;
+      }
+      if (current == null) {
+        qb.setQbJoinTree(current = target);
+      } else {
+        current.setJoinSrc(target);
+        current = target;
+      }
     }
-
-    return mergeJoinNodes(qb, parent, node, target.getJoinSrc());
   }
 
-  private void mergeJoinTree(QB qb) {
-    QBJoinTree root = qb.getQbJoinTree();
-    QBJoinTree parent = null;
-    while (root != null) {
-      boolean merged = mergeJoinNodes(qb, parent, root, root.getJoinSrc());
-
-      if (parent == null) {
-        if (merged) {
-          root = qb.getQbJoinTree();
-        } else {
-          parent = root;
-          root = root.getJoinSrc();
-        }
-      } else {
-        if (merged) {
-          root = root.getJoinSrc();
-        } else {
-          parent = parent.getJoinSrc();
-          root = parent.getJoinSrc();
-        }
+  // Join types should be all the same for merging (or returns null)
+  private JoinType getType(JoinCond[] conds) {
+    JoinType type = conds[0].getJoinType();
+    for (int k = 1; k < conds.length; k++) {
+      if (type != conds[k].getJoinType()) {
+        return null;
       }
     }
+    return type;
   }
 
   private Operator insertSelectAllPlanForGroupBy(Operator input)
@@ -7109,6 +7241,11 @@ public class SemanticAnalyzer extends Ba
       curr = genHavingPlan(dest, qb, curr);
     }
 
+
+    if(queryProperties.hasWindowing() && qb.getWindowingSpec(dest) != null) {
+      curr = genWindowingPlan(qb.getWindowingSpec(dest), curr);
+    }
+
     curr = genSelectPlan(dest, qb, curr);
     Integer limit = qbp.getDestLimit(dest);
 
@@ -7148,6 +7285,7 @@ public class SemanticAnalyzer extends Ba
       curr = genReduceSinkPlan(dest, qb, curr, numReducers);
     }
 
+
     if (qbp.getIsSubQ()) {
       if (limit != null) {
         // In case of order by, only 1 reducer is used, so no need of
@@ -7368,17 +7506,21 @@ public class SemanticAnalyzer extends Ba
     }
 
     RowResolver rowResolver = new RowResolver();
+    Map<String, ExprNodeDesc> columnExprMap = new HashMap<String, ExprNodeDesc>();
+
     List<String> colName = new ArrayList<String>();
     for (int i = 0; i < columns.size(); i++) {
       String name = getColumnInternalName(i);
-      rowResolver.put(origInputAlias, name, new ColumnInfo(name, columns.get(i)
-          .getTypeInfo(), "", false));
+      ColumnInfo col = new ColumnInfo(name, columns.get(i)
+          .getTypeInfo(), "", false);
+      rowResolver.put(origInputAlias, name, col);
       colName.add(name);
+      columnExprMap.put(name, columns.get(i));
     }
 
     Operator<SelectDesc> newInputOp = OperatorFactory.getAndMakeChild(
         new SelectDesc(columns, colName), new RowSchema(rowResolver.getColumnInfos()),
-        origInputOp);
+        columnExprMap, origInputOp);
     return putOpInsertMap(newInputOp, rowResolver);
   }
 
@@ -7793,11 +7935,38 @@ public class SemanticAnalyzer extends Ba
       aliasToOpInfo.put(alias, op);
     }
 
+    Operator srcOpInfo = null;
+    Operator lastPTFOp = null;
+
+    if(queryProperties.hasPTF()){
+      //After processing subqueries and source tables, process
+      // partitioned table functions
+
+      HashMap<ASTNode, PTFInvocationSpec> ptfNodeToSpec = qb.getPTFNodeToSpec();
+      if ( ptfNodeToSpec != null ) {
+        for(Entry<ASTNode, PTFInvocationSpec> entry : ptfNodeToSpec.entrySet()) {
+          ASTNode ast = entry.getKey();
+          PTFInvocationSpec spec = entry.getValue();
+          String inputAlias = spec.getQueryInputName();
+          Operator inOp = aliasToOpInfo.get(inputAlias);
+          if ( inOp == null ) {
+            throw new SemanticException(generateErrorMessage(ast,
+                "Cannot resolve input Operator for PTF invocation"));
+          }
+          lastPTFOp = genPTFPlan(spec, inOp);
+          String ptfAlias = ((PartitionedTableFunctionSpec)spec.getFunction()).getAlias();
+          if ( ptfAlias != null ) {
+            aliasToOpInfo.put(ptfAlias, lastPTFOp);
+          }
+        }
+      }
+
+    }
+
     // For all the source tables that have a lateral view, attach the
     // appropriate operators to the TS
     genLateralViewPlans(aliasToOpInfo, qb);
 
-    Operator srcOpInfo = null;
 
     // process join
     if (qb.getParseInfo().getJoinExpr() != null) {
@@ -7820,6 +7989,9 @@ public class SemanticAnalyzer extends Ba
       // Now if there are more than 1 sources then we have a join case
       // later we can extend this to the union all case as well
       srcOpInfo = aliasToOpInfo.values().iterator().next();
+      // with ptfs, there maybe more (note for PTFChains:
+      // 1 ptf invocation may entail multiple PTF operators)
+      srcOpInfo = lastPTFOp != null ? lastPTFOp : srcOpInfo;
     }
 
     Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo);
@@ -8478,7 +8650,8 @@ public class SemanticAnalyzer extends Ba
     }
 
     // continue analyzing from the child ASTNode.
-    if (!doPhase1(child, qb, initPhase1Ctx())) {
+    Phase1Ctx ctx_1 = initPhase1Ctx();
+    if (!doPhase1(child, qb, ctx_1)) {
       // if phase1Result false return
       return;
     }
@@ -8515,11 +8688,12 @@ public class SemanticAnalyzer extends Ba
 
     ParseContext pCtx = new ParseContext(conf, qb, child, opToPartPruner,
         opToPartList, topOps, topSelOps, opParseCtx, joinContext, smbMapJoinContext,
-        topToTable,
+        topToTable, fsopToTable,
         loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
         opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks,
-        opToPartToSkewedPruner, viewAliasToInput);
+        opToPartToSkewedPruner, viewAliasToInput,
+        reduceSinkOperatorsAddedByEnforceBucketingSorting, queryProperties);
 
     // Generate table access stats if required
     if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
@@ -9672,4 +9846,1040 @@ public class SemanticAnalyzer extends Ba
   public void setQB(QB qb) {
     this.qb = qb;
   }
+
+//--------------------------- PTF handling -----------------------------------
+
+  /*
+   * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+   *   PTF invocation.
+   * - For a TABLEREF: set the source to the alias returned by processTable
+   * - For a SubQuery: set the source to the alias returned by processSubQuery
+   * - For a PTF invocation: recursively call processPTFChain.
+   */
+  private PTFInputSpec processPTFSource(QB qb, ASTNode inputNode) throws SemanticException{
+
+    PTFInputSpec qInSpec = null;
+    int type = inputNode.getType();
+    String alias;
+    switch(type)
+    {
+    case HiveParser.TOK_TABREF:
+      alias = processTable(qb, inputNode);
+      qInSpec = new PTFQueryInputSpec();
+      ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.TABLE);
+      ((PTFQueryInputSpec)qInSpec).setSource(alias);
+      break;
+    case HiveParser.TOK_SUBQUERY:
+      alias = processSubQuery(qb, inputNode);
+      qInSpec = new PTFQueryInputSpec();
+      ((PTFQueryInputSpec)qInSpec).setType(PTFQueryInputType.SUBQUERY);
+      ((PTFQueryInputSpec)qInSpec).setSource(alias);
+      break;
+    case HiveParser.TOK_PTBLFUNCTION:
+      qInSpec = processPTFChain(qb, inputNode);
+      break;
+    default:
+      throw new SemanticException(generateErrorMessage(inputNode,
+          "Unknown input type to PTF"));
+    }
+
+    qInSpec.setAstNode(inputNode);
+    return qInSpec;
+
+  }
+
+  /*
+   * - tree form is
+   *   ^(TOK_PTBLFUNCTION name alias? partitionTableFunctionSource partitioningSpec? arguments*)
+   * - a partitionTableFunctionSource can be a tableReference, a SubQuery or another
+   *   PTF invocation.
+   */
+  private PartitionedTableFunctionSpec processPTFChain(QB qb, ASTNode ptf)
+      throws SemanticException{
+    int child_count = ptf.getChildCount();
+    if (child_count < 2) {
+      throw new SemanticException(generateErrorMessage(ptf,
+                  "Not enough Children " + child_count));
+    }
+
+    PartitionedTableFunctionSpec ptfSpec = new PartitionedTableFunctionSpec();
+    ptfSpec.setAstNode(ptf);
+
+    /*
+     * name
+     */
+    ASTNode nameNode = (ASTNode) ptf.getChild(0);
+    ptfSpec.setName(nameNode.getText());
+
+    int inputIdx = 1;
+
+    /*
+     * alias
+     */
+    ASTNode secondChild = (ASTNode) ptf.getChild(1);
+    if ( secondChild.getType() == HiveParser.Identifier ) {
+      ptfSpec.setAlias(secondChild.getText());
+      inputIdx++;
+    }
+
+    /*
+     * input
+     */
+    ASTNode inputNode = (ASTNode) ptf.getChild(inputIdx);
+    ptfSpec.setInput(processPTFSource(qb, inputNode));
+
+    int argStartIdx = inputIdx + 1;
+
+    /*
+     * partitioning Spec
+     */
+    int pSpecIdx = inputIdx + 1;
+    ASTNode pSpecNode = ptf.getChildCount() > inputIdx ?
+        (ASTNode) ptf.getChild(pSpecIdx) : null;
+    if (pSpecNode != null && pSpecNode.getType() == HiveParser.TOK_PARTITIONINGSPEC)
+    {
+      PartitioningSpec partitioning = processPTFPartitionSpec(pSpecNode);
+      ptfSpec.setPartitioning(partitioning);
+      argStartIdx++;
+    }
+
+    /*
+     * arguments
+     */
+    for(int i=argStartIdx; i < ptf.getChildCount(); i++)
+    {
+      ptfSpec.addArg((ASTNode) ptf.getChild(i));
+    }
+    return ptfSpec;
+  }
+
+  /*
+   * - invoked during FROM AST tree processing, on encountering a PTF invocation.
+   * - tree form is
+   *   ^(TOK_PTBLFUNCTION name partitionTableFunctionSource partitioningSpec? arguments*)
+   * - setup a PTFInvocationSpec for this top level PTF invocation.
+   */
+  private void processPTF(QB qb, ASTNode ptf) throws SemanticException{
+
+    PartitionedTableFunctionSpec ptfSpec = processPTFChain(qb, ptf);
+
+    if ( ptfSpec.getAlias() != null ) {
+      qb.addAlias(ptfSpec.getAlias());
+    }
+
+    PTFInvocationSpec spec = new PTFInvocationSpec();
+    spec.setFunction(ptfSpec);
+    qb.addPTFNodeToSpec(ptf, spec);
+  }
+
+//--------------------------- Windowing handling -----------------------------------
+
+  /*
+   * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+   * What makes a UDAF invocation a Windowing Function invocation:
+   * 1. It appears in a SelectExpr that as a WindowSpec
+   * 2. It is a UDAF that implies order (FunctionRegistry.impliesOrder)
+   * 3. It contains lead/lag UDF invocations in its args.
+   */
+  private boolean checkAndExtractWindowFunctionsInSelect(QB qb, ASTNode selectExpr, String dest)
+      throws SemanticException {
+
+    int childCount = selectExpr.getChildCount();
+    ASTNode windowSpec = (ASTNode) selectExpr.getChild(childCount - 1);
+
+    boolean hasWindowSpec = windowSpec.getType() == HiveParser.TOK_WINDOWSPEC;
+
+    ArrayList<ASTNode> functions =
+        extractWindowingUDAFs((ASTNode) selectExpr.getChild(0), !hasWindowSpec);
+    if ( functions.size() == 0 ) {
+      return false;
+    }
+
+    WindowingSpec spec = qb.getWindowingSpec(dest);
+    if(spec == null) {
+      queryProperties.setHasWindowing(true);
+      spec = new WindowingSpec();
+      qb.addDestToWindowingSpec(dest, spec);
+    }
+
+    HashMap<String, ASTNode> wExprsInDest = qb.getParseInfo().getWindowingExprsForClause(dest);
+    int wColIdx = spec.getWindowExpressions() == null ? 0 : spec.getWindowExpressions().size();
+    for(ASTNode function : functions) {
+      WindowFunctionSpec wFnSpec = processWindowFunction(function,
+          hasWindowSpec ? windowSpec : null);
+
+      /*
+       * If this is a duplicate invocation of a function; don't add to WindowingSpec.
+       */
+      if ( wExprsInDest != null &&
+          wExprsInDest.containsKey(wFnSpec.getExpression().toStringTree())) {
+        continue;
+      }
+      wFnSpec.setAlias("_wcol" + wColIdx++);
+      spec.addWindowFunction(wFnSpec);
+      qb.getParseInfo().addWindowingExprToClause(dest, wFnSpec.getExpression());
+    }
+    return true;
+  }
+
+  /*
+   * return the UDAFs within the expressionTree.
+   * If implyOrder is true, then only return the invocations that:
+   * - are for UDAFs that implyOrder (FunctionRegistry.implyOrder)
+   * - or contain a Lead/Lag UDF invocation in their arguments
+   * If implyOrder is false, then return all UDAF invocations.
+   */
+  private ArrayList<ASTNode> extractWindowingUDAFs(ASTNode expressionTree, boolean implyOrder) {
+    ArrayList<ASTNode> aggregations = new ArrayList<ASTNode>();
+    extractWindowingUDAFs(expressionTree, aggregations);
+    if (!implyOrder) {
+      return aggregations;
+    }
+    ArrayList<ASTNode> wdwUDAFs = new ArrayList<ASTNode>();
+    for(ASTNode function : aggregations) {
+      String fnName = function.getChild(0).getText().toLowerCase();
+      if ( FunctionRegistry.impliesOrder(fnName)) {
+        wdwUDAFs.add(function);
+        continue;
+      }
+      boolean hasLLInArgs = false;
+      for(int i=1; i < function.getChildCount(); i++) {
+        ASTNode child = (ASTNode) function.getChild(i);
+        hasLLInArgs = containsLeadLagUDF(child);
+        if (hasLLInArgs) {
+          break;
+        }
+      }
+      if (hasLLInArgs) {
+        wdwUDAFs.add(function);
+      }
+    }
+    return wdwUDAFs;
+  }
+
+  private void extractWindowingUDAFs(ASTNode expressionTree,
+      ArrayList<ASTNode> aggregations) {
+    int exprTokenType = expressionTree.getToken().getType();
+    if (exprTokenType == HiveParser.TOK_FUNCTION
+        || exprTokenType == HiveParser.TOK_FUNCTIONDI
+        || exprTokenType == HiveParser.TOK_FUNCTIONSTAR) {
+      assert (expressionTree.getChildCount() != 0);
+      if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+        String functionName = unescapeIdentifier(expressionTree.getChild(0)
+            .getText());
+        WindowFunctionInfo fi = FunctionRegistry.getWindowFunctionInfo(functionName);
+        if (fi != null) {
+          aggregations.add(expressionTree);
+          return;
+        }
+      }
+    }
+    for (int i = 0; i < expressionTree.getChildCount(); i++) {
+      extractWindowingUDAFs((ASTNode) expressionTree.getChild(i),
+          aggregations);
+    }
+  }
+
+  private boolean containsLeadLagUDF(ASTNode expressionTree) {
+    int exprTokenType = expressionTree.getToken().getType();
+    if (exprTokenType == HiveParser.TOK_FUNCTION) {
+      assert (expressionTree.getChildCount() != 0);
+      if (expressionTree.getChild(0).getType() == HiveParser.Identifier) {
+        String functionName = unescapeIdentifier(expressionTree.getChild(0)
+            .getText());
+        functionName = functionName.toLowerCase();
+        if ( FunctionRegistry.LAG_FUNC_NAME.equals(functionName) ||
+            FunctionRegistry.LEAD_FUNC_NAME.equals(functionName)
+            ) {
+          return true;
+        }
+      }
+    }
+    for (int i = 0; i < expressionTree.getChildCount(); i++) {
+      if ( containsLeadLagUDF((ASTNode) expressionTree.getChild(i))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /*
+   * - Invoked during Phase1 when a TOK_SELECT is encountered.
+   * - Select tree form is: ^(TOK_SELECT ^(TOK_SELECTEXPR...) ^(TOK_SELECTEXPR...) ...)
+   * - A Select Item form is: ^(TOK_SELEXPR selectExpression Identifier* window_specification?)
+   *
+   * See checkAndExtractWindowFunctionsInSelect for rules on what makes a UDAF invocation
+   * a Windowing Function invocation
+   */
+  private void handleWindowingExprsInSelectList(QB qb, String dest, ASTNode selectNode)
+      throws SemanticException {
+    for(int i=0; i < selectNode.getChildCount(); i++)
+    {
+      ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+      if ( selectExpr.getType() != HiveParser.TOK_SELEXPR )
+      {
+        continue;
+      }
+      boolean hasWindowingExprs = checkAndExtractWindowFunctionsInSelect(qb, selectExpr, dest);
+
+    }
+  }
+
+  private void handleQueryWindowClauses(QB qb, Phase1Ctx ctx_1, ASTNode node)
+      throws SemanticException {
+    WindowingSpec spec = qb.getWindowingSpec(ctx_1.dest);
+    for(int i=0; i < node.getChildCount(); i++) {
+      processQueryWindowClause(spec, (ASTNode) node.getChild(i));
+    }
+  }
+
+  private PartitionSpec processPartitionSpec(ASTNode node) {
+    PartitionSpec pSpec = new PartitionSpec();
+    int exprCnt = node.getChildCount();
+    for(int i=0; i < exprCnt; i++) {
+      PartitionExpression exprSpec = new PartitionExpression();
+      exprSpec.setExpression((ASTNode) node.getChild(i));
+      pSpec.addExpression(exprSpec);
+    }
+    return pSpec;
+  }
+
+  private OrderSpec processOrderSpec(ASTNode sortNode) {
+    OrderSpec oSpec = new OrderSpec();
+    int exprCnt = sortNode.getChildCount();
+    for(int i=0; i < exprCnt; i++) {
+      OrderExpression exprSpec = new OrderExpression();
+      exprSpec.setExpression((ASTNode) sortNode.getChild(i).getChild(0));
+      if ( sortNode.getChild(i).getType() == HiveParser.TOK_TABSORTCOLNAMEASC ) {
+        exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.ASC);
+      }
+      else {
+        exprSpec.setOrder(org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.DESC);
+      }
+      oSpec.addExpression(exprSpec);
+    }
+    return oSpec;
+  }
+
+  private PartitioningSpec processPTFPartitionSpec(ASTNode pSpecNode)
+  {
+    PartitioningSpec partitioning = new PartitioningSpec();
+    ASTNode firstChild = (ASTNode) pSpecNode.getChild(0);
+    int type = firstChild.getType();
+    int exprCnt;
+
+
+    if ( type == HiveParser.TOK_DISTRIBUTEBY || type == HiveParser.TOK_CLUSTERBY )
+    {
+      PartitionSpec pSpec = processPartitionSpec(firstChild);
+      partitioning.setPartSpec(pSpec);
+      ASTNode sortNode = pSpecNode.getChildCount() > 1 ? (ASTNode) pSpecNode.getChild(1) : null;
+      if ( sortNode != null )
+      {
+        OrderSpec oSpec = processOrderSpec(sortNode);
+        partitioning.setOrderSpec(oSpec);
+      }
+    }
+    else if ( type == HiveParser.TOK_SORTBY || type == HiveParser.TOK_ORDERBY ) {
+      ASTNode sortNode = firstChild;
+      OrderSpec oSpec = processOrderSpec(sortNode);
+      partitioning.setOrderSpec(oSpec);
+    }
+    return partitioning;
+  }
+
+  private WindowFunctionSpec processWindowFunction(ASTNode node, ASTNode wsNode)
+    throws SemanticException {
+    WindowFunctionSpec wfSpec = new WindowFunctionSpec();
+
+    switch(node.getType()) {
+    case HiveParser.TOK_FUNCTIONSTAR:
+      wfSpec.setStar(true);
+      break;
+    case HiveParser.TOK_FUNCTIONDI:
+      wfSpec.setDistinct(true);
+      break;
+    }
+
+    if ( wfSpec.isDistinct() ) {
+      throw new SemanticException(generateErrorMessage(node,
+          "Count/Sum distinct not supported with Windowing"));
+    }
+
+    wfSpec.setExpression(node);
+
+    ASTNode nameNode = (ASTNode) node.getChild(0);
+    wfSpec.setName(nameNode.getText());
+
+    for(int i=1; i < node.getChildCount(); i++) {
+      ASTNode child = (ASTNode) node.getChild(i);
+      wfSpec.addArg(child);
+    }
+
+    if ( wsNode != null ) {
+      WindowSpec ws = processWindowSpec(wsNode);
+      wfSpec.setWindowSpec(ws);
+    }
+
+    /*
+     * In order to distinguish between different UDAF invocations on the same UDAF but different Windows
+     * add the WdwSpec node as a child of the Function Node.
+     * It is safe to do this after the function node has been converetd to a WdwFuncSpec.
+     */
+    if ( wsNode != null ) {
+      node.addChild(wsNode);
+    }
+
+    return wfSpec;
+  }
+
+  private void processQueryWindowClause(WindowingSpec spec, ASTNode node)
+      throws SemanticException {
+    ASTNode nameNode = (ASTNode) node.getChild(0);
+    ASTNode wsNode = (ASTNode) node.getChild(1);
+    if(spec.getWindowSpecs() != null && spec.getWindowSpecs().containsKey(nameNode.getText())){
+      throw new SemanticException(generateErrorMessage(nameNode,
+          "Duplicate definition of window " + nameNode.getText() +
+          " is not allowed"));
+    }
+    WindowSpec ws = processWindowSpec(wsNode);
+    spec.addWindowSpec(nameNode.getText(), ws);
+  }
+
+  private WindowSpec processWindowSpec(ASTNode node) throws SemanticException {
+    String sourceId = null;
+    PartitionSpec partition = null;
+    OrderSpec order = null;
+    WindowFrameSpec windowFrame = null;
+
+    boolean hasSrcId = false, hasPartSpec = false, hasWF = false;
+    int srcIdIdx = -1, partIdx = -1, wfIdx = -1;
+
+    for(int i=0; i < node.getChildCount(); i++)
+    {
+      int type = node.getChild(i).getType();
+      switch(type)
+      {
+      case HiveParser.Identifier:
+        hasSrcId = true; srcIdIdx = i;
+        break;
+      case HiveParser.TOK_PARTITIONINGSPEC:
+        hasPartSpec = true; partIdx = i;
+        break;
+      case HiveParser.TOK_WINDOWRANGE:
+      case HiveParser.TOK_WINDOWVALUES:
+        hasWF = true; wfIdx = i;
+        break;
+      }
+    }
+
+    WindowSpec ws = new WindowSpec();
+
+    if (hasSrcId) {
+      ASTNode nameNode = (ASTNode) node.getChild(srcIdIdx);
+      ws.setSourceId(nameNode.getText());
+    }
+
+    if (hasPartSpec) {
+      ASTNode partNode = (ASTNode) node.getChild(partIdx);
+      PartitioningSpec partitioning = processPTFPartitionSpec(partNode);
+      ws.setPartitioning(partitioning);
+    }
+
+    if ( hasWF)
+    {
+      ASTNode wfNode = (ASTNode) node.getChild(wfIdx);
+      WindowFrameSpec wfSpec = processWindowFrame(wfNode);
+      ws.setWindowFrame(wfSpec);
+    }
+
+    return ws;
+  }
+
+  private WindowFrameSpec processWindowFrame(ASTNode node) throws SemanticException {
+    int type = node.getType();
+    BoundarySpec start = null, end = null;
+
+    /*
+     * A WindowFrame may contain just the Start Boundary or in the
+     * between style of expressing a WindowFrame both boundaries
+     * are specified.
+     */
+    start = processBoundary(type, (ASTNode) node.getChild(0));
+    if ( node.getChildCount() > 1 ) {
+      end = processBoundary(type, (ASTNode) node.getChild(1));
+    }
+
+    return new WindowFrameSpec(start, end);
+  }
+
+  private BoundarySpec processBoundary(int frameType, ASTNode node)  throws SemanticException {
+    BoundarySpec bs = frameType == HiveParser.TOK_WINDOWRANGE ?
+        new RangeBoundarySpec() : new ValueBoundarySpec();
+    int type = node.getType();
+    boolean hasAmt = true;
+
+    switch(type)
+    {
+    case HiveParser.KW_PRECEDING:
+      bs.setDirection(Direction.PRECEDING);
+      break;
+    case HiveParser.KW_FOLLOWING:
+      bs.setDirection(Direction.FOLLOWING);
+      break;
+    case HiveParser.KW_CURRENT:
+      bs = new CurrentRowSpec();
+      hasAmt = false;
+      break;
+    }
+
+    if ( hasAmt )
+    {
+      ASTNode amtNode = (ASTNode) node.getChild(0);
+      if ( amtNode.getType() == HiveParser.KW_UNBOUNDED)
+      {
+        bs.setAmt(BoundarySpec.UNBOUNDED_AMOUNT);
+      }
+      else
+      {
+        int amt = Integer.parseInt(amtNode.getText());
+        if ( amt < 0 ) {
+          throw new SemanticException(
+              "Window Frame Boundary Amount must be a +ve integer, amount provide is: " + amt);
+        }
+        bs.setAmt(amt);
+      }
+    }
+
+    return bs;
+  }
+
+  /*
+   * check if a Select Expr is a constant.
+   * - current logic used is to look for HiveParser.TOK_TABLE_OR_COL
+   * - if there is none then the expression is a constant.
+   */
+  private static class ConstantExprCheck implements ContextVisitor {
+    boolean isConstant = true;
+
+    public void visit(Object t, Object parent, int childIndex, Map labels) {
+      if ( !isConstant ) {
+        return;
+      }
+      ASTNode node = (ASTNode) t;
+      if (ParseDriver.adaptor.getType(t) == HiveParser.TOK_TABLE_OR_COL ) {
+        isConstant = false;
+      }
+    }
+
+    public void reset() {
+      isConstant = true;
+    }
+
+    protected boolean isConstant() {
+      return isConstant;
+    }
+  }
+
+  private static class AggregationExprCheck implements ContextVisitor {
+    HashMap<String, ASTNode> destAggrExprs;
+    boolean isAggr = false;
+
+    public AggregationExprCheck(HashMap<String, ASTNode> destAggrExprs) {
+      super();
+      this.destAggrExprs = destAggrExprs;
+    }
+
+    public void visit(Object t, Object parent, int childIndex, Map labels) {
+      if ( isAggr ) {
+        return;
+      }
+      if ( destAggrExprs.values().contains(t)) {
+        isAggr = true;
+      }
+    }
+
+    public void reset() {
+      isAggr = false;
+    }
+
+    protected boolean isAggr() {
+      return isAggr;
+    }
+  }
+
+  /*
+   * Returns false if there is a SelectExpr that is not a constant or an aggr.
+   *
+   */
+  private boolean isValidGroupBySelectList(QB currQB, String clause){
+    ConstantExprCheck constantExprCheck = new ConstantExprCheck();
+    AggregationExprCheck aggrExprCheck = new AggregationExprCheck(
+        currQB.getParseInfo().getAggregationExprsForClause(clause));
+
+    TreeWizard tw = new TreeWizard(ParseDriver.adaptor, HiveParser.tokenNames);
+    ASTNode selectNode = currQB.getParseInfo().getSelForClause(clause);
+
+    /*
+     * for Select Distinct Queries we don't move any aggregations.
+     */
+    if ( selectNode != null && selectNode.getType() == HiveParser.TOK_SELECTDI ) {
+      return true;
+    }
+
+    for (int i = 0; selectNode != null && i < selectNode.getChildCount(); i++) {
+      ASTNode selectExpr = (ASTNode) selectNode.getChild(i);
+      //check for TOK_HINTLIST expressions on ast
+      if(selectExpr.getType() != HiveParser.TOK_SELEXPR){
+        continue;
+      }
+
+      constantExprCheck.reset();
+      PTFTranslator.visit(selectExpr.getChild(0), constantExprCheck);
+
+      if ( !constantExprCheck.isConstant() ) {
+        aggrExprCheck.reset();
+        PTFTranslator.visit(selectExpr.getChild(0), aggrExprCheck);
+        if (!aggrExprCheck.isAggr() ) {
+          return false;
+        }
+      }
+
+    }
+    return true;
+  }
+
+//--------------------------- PTF handling: PTFInvocationSpec to PTFDesc --------------------------
+
+  private PTFDesc translatePTFInvocationSpec(PTFInvocationSpec ptfQSpec, RowResolver inputRR)
+      throws SemanticException{
+    PTFDesc ptfDesc = null;
+    PTFTranslator translator = new PTFTranslator();
+    ptfDesc = translator.translate(ptfQSpec, this, conf, inputRR, unparseTranslator);
+    return ptfDesc;
+  }
+
+  Operator genPTFPlan(PTFInvocationSpec ptfQSpec, Operator input) throws SemanticException {
+    ArrayList<PTFInvocationSpec> componentQueries = PTFTranslator.componentize(ptfQSpec);
+    for (PTFInvocationSpec ptfSpec : componentQueries) {
+      input = genPTFPlanForComponentQuery(ptfSpec, input);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created PTF Plan ");
+    }
+    return input;
+  }
+
+
+  /**
+   * Construct the data structures containing ExprNodeDesc for partition
+   * columns and order columns. Use the input definition to construct the list
+   * of output columns for the ReduceSinkOperator
+   *
+   * @throws SemanticException
+   */
+  void buildPTFReduceSinkDetails(PartitionedTableFunctionDef tabDef,
+      RowResolver inputRR,
+      ArrayList<ExprNodeDesc> partCols,
+      ArrayList<ExprNodeDesc> valueCols,
+      ArrayList<ExprNodeDesc> orderCols,
+      Map<String, ExprNodeDesc> colExprMap,
+      List<String> outputColumnNames,
+      StringBuilder orderString,
+      RowResolver extractRR) throws SemanticException {
+
+    ArrayList<PTFExpressionDef> partColList = tabDef.getPartition().getExpressions();
+
+    for (PTFExpressionDef colDef : partColList) {
+      partCols.add(colDef.getExprNode());
+      orderCols.add(colDef.getExprNode());
+      orderString.append('+');
+    }
+
+    /*
+     * Order columns are used as key columns for constructing
+     * the ReduceSinkOperator
+     * Since we do not explicitly add these to outputColumnNames,
+     * we need to set includeKeyCols = false while creating the
+     * ReduceSinkDesc
+     */
+    ArrayList<OrderExpressionDef> orderColList = tabDef.getOrder().getExpressions();
+    for (int i = 0; i < orderColList.size(); i++) {
+      OrderExpressionDef colDef = orderColList.get(i);
+      org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = colDef.getOrder();
+      if (order.name().equals("ASC")) {
+        orderString.append('+');
+      } else {
+        orderString.append('-');
+      }
+      orderCols.add(colDef.getExprNode());
+    }
+
+    /*
+     * We add the column to value columns or output column names
+     * only if it is not a virtual column
+     */
+    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+        new LinkedHashMap<String[], ColumnInfo>();
+    int pos = 0;
+    for (ColumnInfo colInfo : colInfoList) {
+      if (!colInfo.isHiddenVirtualCol()) {
+        ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+            .getInternalName(), colInfo.getTabAlias(), colInfo
+            .getIsVirtualCol());
+        valueCols.add(valueColExpr);
+        colExprMap.put(colInfo.getInternalName(), valueColExpr);
+        String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+        outputColumnNames.add(outColName);
+
+        String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+        /*
+         * if we have already encountered this colInfo internalName.
+         * We encounter it again because it must be put for the Having clause.
+         * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+         */
+        if ( colsAddedByHaving.containsKey(alias)) {
+          continue;
+        }
+        ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+        ColumnInfo eColInfo = new ColumnInfo(
+            outColName, colInfo.getType(), alias[0],
+            colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+        if ( astNode == null ) {
+          extractRR.put(alias[0], alias[1], eColInfo);
+        }
+        else {
+          /*
+           * in case having clause refers to this column may have been added twice;
+           * once with the ASTNode.toStringTree as the alias
+           * and then with the real alias.
+           */
+          extractRR.putExpression(astNode, eColInfo);
+          if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+            colsAddedByHaving.put(alias, eColInfo);
+          }
+        }
+      }
+    }
+
+    for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+      String[] alias = columnAddedByHaving.getKey();
+      ColumnInfo eColInfo = columnAddedByHaving.getValue();
+      extractRR.put(alias[0], alias[1], eColInfo);
+    }
+  }
+
+  private Operator genPTFPlanForComponentQuery(PTFInvocationSpec ptfQSpec, Operator input)
+      throws SemanticException {
+    /*
+     * 1. Create the PTFDesc from the Qspec attached to this QB.
+     */
+    RowResolver rr = opParseCtx.get(input).getRowResolver();
+    PTFDesc ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+    /*
+     * Build an RR for the Extract Op from the ResuceSink Op's RR.
+     * Why?
+     * We need to remove the Virtual Columns present in the RS's RR. The OI
+     * that gets passed to Extract at runtime doesn't contain the Virtual Columns.
+     * So internal names get changed. Consider testCase testJoinWithLeadLag,
+     * which is a self join on part and also has a Windowing expression.
+     * The RR of the RS op at transaltion time looks something like this:
+     * (_co1,_col2,..,_col7, _col8(vc=true),_col9(vc=true),
+     * _col10,_col11,.._col15(vc=true),_col16(vc=true),..)
+     * At runtime the Virtual columns are removed and all the columns after _col7
+     * are shifted 1 or 2 positions.
+     * So in child Operators ColumnExprNodeDesc's are no longer referring to the right columns.
+     *
+     * So we build a new RR for the Extract Op, with the Virtual Columns removed.
+     * We hand this to the PTFTranslator as the
+     * starting RR to use to translate a PTF Chain.
+     */
+    RowResolver extractOpRR = new RowResolver();
+
+    /*
+     * 2. build Map-side Op Graph. Graph template is either:
+     * Input -> PTF_map -> ReduceSink
+     * or
+     * Input -> ReduceSink
+     *
+     * Here the ExprNodeDescriptors in the QueryDef are based on the Input Operator's RR.
+     */
+    {
+      PartitionedTableFunctionDef tabDef = ptfDesc.getStartOfChain();
+
+      /*
+       * a. add Map-side PTF Operator if needed
+       */
+      if (tabDef.isTransformsRawInput() )
+      {
+        RowResolver ptfMapRR = tabDef.getRawInputShape().getRr();
+
+        input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+            new RowSchema(ptfMapRR.getColumnInfos()),
+            input), ptfMapRR);
+        rr = opParseCtx.get(input).getRowResolver();
+      }
+
+      /*
+       * b. Build Reduce Sink Details (keyCols, valueCols, outColNames etc.) for this ptfDesc.
+       */
+
+      ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+      ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+      Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+      List<String> outputColumnNames = new ArrayList<String>();
+      StringBuilder orderString = new StringBuilder();
+
+      /*
+       * Use the input RR of TableScanOperator in case there is no map-side
+       * reshape of input.
+       * If the parent of ReduceSinkOperator is PTFOperator, use it's
+       * output RR.
+       */
+      buildPTFReduceSinkDetails(tabDef,
+          rr,
+          partCols,
+          valueCols,
+          orderCols,
+          colExprMap,
+          outputColumnNames,
+          orderString,
+          extractOpRR);
+
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+          .getReduceSinkDesc(orderCols,
+              valueCols, outputColumnNames, false,
+              -1, partCols, orderString.toString(), -1),
+          new RowSchema(rr.getColumnInfos()), input), rr);
+      input.setColumnExprMap(colExprMap);
+    }
+
+    /*
+     * 3. build Reduce-side Op Graph
+     */
+    {
+      /*
+       * b. Construct Extract Operator.
+       */
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+          new ExtractDesc(
+              new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+                  Utilities.ReduceField.VALUE
+                  .toString(), "", false)),
+          new RowSchema(extractOpRR.getColumnInfos()),
+          input), extractOpRR);
+
+      /*
+       * c. Rebuilt the QueryDef.
+       * Why?
+       * - so that the ExprNodeDescriptors in the QueryDef are based on the
+       *   Extract Operator's RowResolver
+       */
+      rr = opParseCtx.get(input).getRowResolver();
+      ptfDesc = translatePTFInvocationSpec(ptfQSpec, rr);
+
+      /*
+       * d. Construct PTF Operator.
+       */
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+          new RowSchema(ptfOpRR.getColumnInfos()),
+          input), ptfOpRR);
+
+    }
+
+    return input;
+
+  }
+
+//--------------------------- Windowing handling: PTFInvocationSpec to PTFDesc --------------------
+
+  Operator genWindowingPlan(WindowingSpec wSpec, Operator input) throws SemanticException {
+    wSpec.validateAndMakeEffective();
+    WindowingComponentizer groups = new WindowingComponentizer(wSpec);
+    RowResolver rr = opParseCtx.get(input).getRowResolver();
+
+    while(groups.hasNext() ) {
+      wSpec = groups.next(conf, this, unparseTranslator, rr);
+      input = genReduceSinkPlanForWindowing(wSpec, rr, input);
+      rr = opParseCtx.get(input).getRowResolver();
+      PTFTranslator translator = new PTFTranslator();
+      PTFDesc ptfDesc = translator.translate(wSpec, this, conf, rr, unparseTranslator);
+      RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr();
+      input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc,
+          new RowSchema(ptfOpRR.getColumnInfos()),
+          input), ptfOpRR);
+      rr = ptfOpRR;
+    }
+
+    return input;
+  }
+
+  private Operator genReduceSinkPlanForWindowing(WindowingSpec spec,
+      RowResolver inputRR,
+      Operator input) throws SemanticException{
+    ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
+    ArrayList<ExprNodeDesc> orderCols = new ArrayList<ExprNodeDesc>();
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+    List<String> outputColumnNames = new ArrayList<String>();
+    StringBuilder orderString = new StringBuilder();
+
+    ArrayList<PartitionExpression> partColList = spec.getQueryPartitionSpec().getExpressions();
+    for (PartitionExpression partCol : partColList) {
+      ExprNodeDesc partExpr = genExprNodeDesc(partCol.getExpression(), inputRR);
+      partCols.add(partExpr);
+      orderCols.add(partExpr);
+      orderString.append('+');
+    }
+
+    ArrayList<OrderExpression> orderColList = spec.getQueryOrderSpec() == null ?
+        new ArrayList<PTFInvocationSpec.OrderExpression>() :
+          spec.getQueryOrderSpec().getExpressions();
+    for (int i = 0; i < orderColList.size(); i++) {
+      OrderExpression orderCol = orderColList.get(i);
+      org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order order = orderCol.getOrder();
+      if (order.name().equals("ASC")) {
+        orderString.append('+');
+      } else {
+        orderString.append('-');
+      }
+      ExprNodeDesc orderExpr = genExprNodeDesc(orderCol.getExpression(), inputRR);
+      orderCols.add(orderExpr);
+    }
+
+    ArrayList<ColumnInfo> colInfoList = inputRR.getColumnInfos();
+    RowResolver rsNewRR = new RowResolver();
+    int pos = 0;
+    for (ColumnInfo colInfo : colInfoList) {
+        ExprNodeDesc valueColExpr = new ExprNodeColumnDesc(colInfo.getType(), colInfo
+            .getInternalName(), colInfo.getTabAlias(), colInfo
+            .getIsVirtualCol());
+        valueCols.add(valueColExpr);
+        colExprMap.put(colInfo.getInternalName(), valueColExpr);
+        String outColName = SemanticAnalyzer.getColumnInternalName(pos++);
+        outputColumnNames.add(outColName);
+
+        String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+        ColumnInfo newColInfo = new ColumnInfo(
+            outColName, colInfo.getType(), alias[0],
+            colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+        rsNewRR.put(alias[0], alias[1], newColInfo);
+
+    }
+
+    input = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
+        .getReduceSinkDesc(orderCols,
+            valueCols, outputColumnNames, false,
+            -1, partCols, orderString.toString(), -1),
+        new RowSchema(inputRR.getColumnInfos()), input), rsNewRR);
+    input.setColumnExprMap(colExprMap);
+
+
+ // Construct the RR for extract operator
+    RowResolver extractRR = new RowResolver();
+    LinkedHashMap<String[], ColumnInfo> colsAddedByHaving =
+        new LinkedHashMap<String[], ColumnInfo>();
+    pos = 0;
+
+    for (ColumnInfo colInfo : colInfoList) {
+      String[] alias = inputRR.reverseLookup(colInfo.getInternalName());
+      /*
+       * if we have already encountered this colInfo internalName.
+       * We encounter it again because it must be put for the Having clause.
+       * We will add these entries in the end; in a loop on colsAddedByHaving. See below.
+       */
+      if ( colsAddedByHaving.containsKey(alias)) {
+        continue;
+      }
+      ASTNode astNode = PTFTranslator.getASTNode(colInfo, inputRR);
+      ColumnInfo eColInfo = new ColumnInfo(
+          SemanticAnalyzer.getColumnInternalName(pos++), colInfo.getType(), alias[0],
+          colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol());
+
+      if ( astNode == null ) {
+        extractRR.put(alias[0], alias[1], eColInfo);
+      }
+      else {
+        /*
+         * in case having clause refers to this column may have been added twice;
+         * once with the ASTNode.toStringTree as the alias
+         * and then with the real alias.
+         */
+        extractRR.putExpression(astNode, eColInfo);
+        if ( !astNode.toStringTree().toLowerCase().equals(alias[1]) ) {
+          colsAddedByHaving.put(alias, eColInfo);
+        }
+      }
+    }
+
+    for(Map.Entry<String[], ColumnInfo> columnAddedByHaving : colsAddedByHaving.entrySet() ) {
+      String[] alias = columnAddedByHaving.getKey();
+      ColumnInfo eColInfo = columnAddedByHaving.getValue();
+      extractRR.put(alias[0], alias[1], eColInfo);
+    }
+
+    /*
+     * b. Construct Extract Operator.
+     */
+    input = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new ExtractDesc(
+            new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+                Utilities.ReduceField.VALUE
+                .toString(), "", false)),
+        new RowSchema(inputRR.getColumnInfos()),
+        input), extractRR);
+
+
+    return input;
+  }
+
+
+  public static ArrayList<WindowExpressionSpec> parseSelect(String selectExprStr)
+      throws SemanticException
+  {
+    ASTNode selNode = null;
+    try {
+      ParseDriver pd = new ParseDriver();
+      selNode = pd.parseSelect(selectExprStr, null);
+    } catch (ParseException pe) {
+      throw new SemanticException(pe);
+    }
+
+    ArrayList<WindowExpressionSpec> selSpec = new ArrayList<WindowExpressionSpec>();
+    int childCount = selNode.getChildCount();
+    for (int i = 0; i < childCount; i++) {
+      ASTNode selExpr = (ASTNode) selNode.getChild(i);
+      if (selExpr.getType() != HiveParser.TOK_SELEXPR) {
+        throw new SemanticException(String.format(
+            "Only Select expressions supported in dynamic select list: %s", selectExprStr));
+      }
+      ASTNode expr = (ASTNode) selExpr.getChild(0);
+      if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
+        throw new SemanticException(
+            String.format("'%s' column not allowed in dynamic select list", selectExprStr));
+      }
+      ASTNode aliasNode = selExpr.getChildCount() > 1
+          && selExpr.getChild(1).getType() == HiveParser.Identifier ?
+          (ASTNode) selExpr.getChild(1) : null;
+      String alias = null;
+      if ( aliasNode != null ) {
+        alias = aliasNode.getText();
+      }
+      else {
+        String[] tabColAlias = getColAlias(selExpr, null, null, true, -1);
+        alias = tabColAlias[1];
+      }
+      WindowExpressionSpec exprSpec = new WindowExpressionSpec();
+      exprSpec.setAlias(alias);
+      exprSpec.setExpression(expr);
+      selSpec.add(exprSpec);
+    }
+
+    return selSpec;
+  }
+
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java Fri Apr  5 10:34:08 2013
@@ -492,6 +492,7 @@ public final class TypeCheckProcFactory 
     static HashMap<Integer, String> specialUnaryOperatorTextHashMap;
     static HashMap<Integer, String> specialFunctionTextHashMap;
     static HashMap<Integer, String> conversionFunctionTextHashMap;
+    static HashSet<Integer> windowingTokens;
     static {
       specialUnaryOperatorTextHashMap = new HashMap<Integer, String>();
       specialUnaryOperatorTextHashMap.put(HiveParser.PLUS, "positive");
@@ -522,6 +523,22 @@ public final class TypeCheckProcFactory 
           serdeConstants.TIMESTAMP_TYPE_NAME);
       conversionFunctionTextHashMap.put(HiveParser.TOK_DECIMAL,
           serdeConstants.DECIMAL_TYPE_NAME);
+
+      windowingTokens = new HashSet<Integer>();
+      windowingTokens.add(HiveParser.KW_OVER);
+      windowingTokens.add(HiveParser.TOK_PARTITIONINGSPEC);
+      windowingTokens.add(HiveParser.TOK_DISTRIBUTEBY);
+      windowingTokens.add(HiveParser.TOK_SORTBY);
+      windowingTokens.add(HiveParser.TOK_CLUSTERBY);
+      windowingTokens.add(HiveParser.TOK_WINDOWSPEC);
+      windowingTokens.add(HiveParser.TOK_WINDOWRANGE);
+      windowingTokens.add(HiveParser.TOK_WINDOWVALUES);
+      windowingTokens.add(HiveParser.KW_UNBOUNDED);
+      windowingTokens.add(HiveParser.KW_PRECEDING);
+      windowingTokens.add(HiveParser.KW_FOLLOWING);
+      windowingTokens.add(HiveParser.KW_CURRENT);
+      windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEASC);
+      windowingTokens.add(HiveParser.TOK_TABSORTCOLNAMEDESC);
     }
 
     private static boolean isRedundantConversionFunction(ASTNode expr,
@@ -865,6 +882,20 @@ public final class TypeCheckProcFactory 
 
       ASTNode expr = (ASTNode) nd;
 
+      /*
+       * A Windowing specification get added as a child to a UDAF invocation to distinguish it
+       * from similar UDAFs but on different windows.
+       * The UDAF is translated to a WindowFunction invocation in the PTFTranslator.
+       * So here we just return null for tokens that appear in a Window Specification.
+       * When the traversal reaches up to the UDAF invocation its ExprNodeDesc is build using the
+       * ColumnInfo in the InputRR. This is similar to how UDAFs are handled in Select lists.
+       * The difference is that there is translation for Window related tokens, so we just
+       * return null;
+       */
+      if ( windowingTokens.contains(expr.getType())) {
+        return null;
+      }
+
       if (expr.getType() == HiveParser.TOK_TABNAME) {
         return null;
       }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java Fri Apr  5 10:34:08 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+
 /**
  * ColumnStats Work.
  *
@@ -29,6 +31,8 @@ public class ColumnStatsWork implements 
   private static final long serialVersionUID = 1L;
   private FetchWork fWork;
   private ColumnStatsDesc colStats;
+  private static final int LIMIT = -1;
+
 
   public ColumnStatsWork() {
   }
@@ -61,4 +65,21 @@ public class ColumnStatsWork implements 
   public void setColStats(ColumnStatsDesc colStats) {
     this.colStats = colStats;
   }
+
+  public ListSinkOperator getSink() {
+    return fWork.getSink();
+  }
+
+  public void initializeForFetch() {
+    fWork.initializeForFetch();
+  }
+
+  public int getLeastNumRows() {
+    return fWork.getLeastNumRows();
+  }
+
+  public static int getLimit() {
+    return LIMIT;
+  }
+
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Fri Apr  5 10:34:08 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 
@@ -46,7 +47,7 @@ public class TableScanDesc extends Abstr
   /**
    * Used for split sampling (row count per split)
    * For example,
-   *   select count(1) from ss_src2 tablesample(10 ROWS);
+   *   select count(1) from ss_src2 tablesample (10 ROWS) s;
    * provides first 10 rows from all input splits
    */
   private int rowLimit = -1;
@@ -67,6 +68,9 @@ public class TableScanDesc extends Abstr
   public static final String FILTER_TEXT_CONF_STR =
     "hive.io.filter.text";
 
+  // input file name (big) to bucket number
+  private Map<String, Integer> bucketFileNameMapping;
+
   @SuppressWarnings("nls")
   public TableScanDesc() {
   }
@@ -170,4 +174,12 @@ public class TableScanDesc extends Abstr
   public Integer getRowLimitExplain() {
     return rowLimit >= 0 ? rowLimit : null;
   }
+
+  public Map<String, Integer> getBucketFileNameMapping() {
+    return bucketFileNameMapping;
+  }
+
+  public void setBucketFileNameMapping(Map<String, Integer> bucketFileNameMapping) {
+    this.bucketFileNameMapping = bucketFileNameMapping;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Fri Apr  5 10:34:08 2013
@@ -846,6 +846,10 @@ public final class OpProcFactory {
     return new DefaultPPD();
   }
 
+  public static NodeProcessor getPTFProc() {
+    return new ScriptPPD();
+  }
+
   public static NodeProcessor getSCRProc() {
     return new ScriptPPD();
   }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Fri Apr  5 10:34:08 2013
@@ -21,15 +21,15 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.UDTFOperator;
-import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -90,6 +90,9 @@ public class PredicatePushDown implement
     opRules.put(new RuleRegExp("R1",
       FilterOperator.getOperatorName() + "%"),
       OpProcFactory.getFilterProc());
+    opRules.put(new RuleRegExp("R2",
+        PTFOperator.getOperatorName() + "%"),
+        OpProcFactory.getPTFProc());
     opRules.put(new RuleRegExp("R3",
       CommonJoinOperator.getOperatorName() + "%"),
       OpProcFactory.getJoinProc());

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFRound.java Fri Apr  5 10:34:08 2013
@@ -24,8 +24,11 @@ import java.math.RoundingMode;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.serde2.io.BigDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 
 /**
  * UDFRound.
@@ -38,6 +41,10 @@ import org.apache.hadoop.io.IntWritable;
 public class UDFRound extends UDF {
   private final BigDecimalWritable bigDecimalWritable = new BigDecimalWritable();
   private final DoubleWritable doubleWritable = new DoubleWritable();
+  private final LongWritable longWritable = new LongWritable();
+  private final IntWritable intWritable = new IntWritable();
+  private final ShortWritable shortWritable = new ShortWritable();
+  private final ByteWritable byteWritable = new ByteWritable();
 
   public UDFRound() {
   }
@@ -48,7 +55,7 @@ public class UDFRound extends UDF {
       doubleWritable.set(d);
     } else {
       doubleWritable.set(BigDecimal.valueOf(d).setScale(i,
-          RoundingMode.HALF_UP).doubleValue());
+              RoundingMode.HALF_UP).doubleValue());
     }
     return doubleWritable;
   }
@@ -88,4 +95,41 @@ public class UDFRound extends UDF {
     return evaluate(n, i.get());
   }
 
+  public LongWritable evaluate(LongWritable n) {
+    if (n == null) {
+      return null;
+    }
+    longWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+            RoundingMode.HALF_UP).longValue());
+    return longWritable;
+  }
+
+  public IntWritable evaluate(IntWritable n) {
+    if (n == null) {
+      return null;
+    }
+    intWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+            RoundingMode.HALF_UP).intValue());
+    return intWritable;
+  }
+
+  public ShortWritable evaluate(ShortWritable n) {
+    if (n == null) {
+      return null;
+    }
+    shortWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+            RoundingMode.HALF_UP).shortValue());
+    return shortWritable;
+  }
+
+  public ByteWritable evaluate(ByteWritable n) {
+    if (n == null) {
+      return null;
+    }
+    byteWritable.set(BigDecimal.valueOf(n.get()).setScale(0,
+            RoundingMode.HALF_UP).byteValue());
+    return byteWritable;
+  }
+
 }
+

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java?rev=1464915&r1=1464914&r2=1464915&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFComputeStats.java Fri Apr  5 10:34:08 2013
@@ -229,24 +229,32 @@ public class GenericUDAFComputeStats ext
     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
       Object p = parameters[0];
       BooleanStatsAgg myagg = (BooleanStatsAgg) agg;
-      if (p == null) {
-        myagg.countNulls++;
+      boolean emptyTable = false;
+
+      if (parameters[1] == null) {
+        emptyTable = true;
       }
-      else {
-        try {
-          boolean v = PrimitiveObjectInspectorUtils.getBoolean(p, inputOI);
-          if (v == false) {
-            myagg.countFalses++;
-          } else if (v == true){
-            myagg.countTrues++;
-          }
-        } catch (NumberFormatException e) {
-          if (!warned) {
-            warned = true;
-            LOG.warn(getClass().getSimpleName() + " "
-                + StringUtils.stringifyException(e));
-            LOG.warn(getClass().getSimpleName()
-                + " ignoring similar exceptions.");
+
+      if (!emptyTable) {
+        if (p == null) {
+          myagg.countNulls++;
+        }
+        else {
+          try {
+            boolean v = PrimitiveObjectInspectorUtils.getBoolean(p, inputOI);
+            if (v == false) {
+              myagg.countFalses++;
+            } else if (v == true){
+              myagg.countTrues++;
+            }
+          } catch (NumberFormatException e) {
+            if (!warned) {
+              warned = true;
+              LOG.warn(getClass().getSimpleName() + " "
+                  + StringUtils.stringifyException(e));
+              LOG.warn(getClass().getSimpleName()
+                  + " ignoring similar exceptions.");
+            }
           }
         }
       }
@@ -472,14 +480,24 @@ public class GenericUDAFComputeStats ext
     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
       Object p = parameters[0];
       LongStatsAgg myagg = (LongStatsAgg) agg;
+      boolean emptyTable = false;
+
+      if (parameters[1] == null) {
+        emptyTable = true;
+      }
 
       if (myagg.firstItem) {
-        int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        int numVectors = 0;
+        if (!emptyTable) {
+          numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        }
         initNDVEstimator(myagg, numVectors);
         myagg.firstItem = false;
         myagg.numBitVectors = numVectors;
       }
 
+      if (!emptyTable) {
+
       //Update null counter if a null value is seen
       if (p == null) {
         myagg.countNulls++;
@@ -511,6 +529,7 @@ public class GenericUDAFComputeStats ext
           }
         }
       }
+      }
     }
 
     @Override
@@ -572,7 +591,11 @@ public class GenericUDAFComputeStats ext
     @Override
     public Object terminate(AggregationBuffer agg) throws HiveException {
       LongStatsAgg myagg = (LongStatsAgg) agg;
-      long numDV = myagg.numDV.estimateNumDistinctValues();
+
+      long numDV = 0;
+      if (myagg.numBitVectors != 0) {
+        numDV = myagg.numDV.estimateNumDistinctValues();
+      }
 
       // Serialize the result struct
       ((Text) result[0]).set(myagg.columnType);
@@ -770,42 +793,54 @@ public class GenericUDAFComputeStats ext
     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
       Object p = parameters[0];
       DoubleStatsAgg myagg = (DoubleStatsAgg) agg;
+      boolean emptyTable = false;
+
+      if (parameters[1] == null) {
+        emptyTable = true;
+      }
 
       if (myagg.firstItem) {
-        int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        int numVectors = 0;
+        if (!emptyTable) {
+          numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        }
         initNDVEstimator(myagg, numVectors);
         myagg.firstItem = false;
         myagg.numBitVectors = numVectors;
       }
 
-      //Update null counter if a null value is seen
-      if (p == null) {
-        myagg.countNulls++;
-      }
-      else {
-        try {
-          double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
-
-          //Update min counter if new value is less than min seen so far
-          if (v < myagg.min) {
-            myagg.min = v;
-          }
+      if (!emptyTable) {
 
-          //Update max counter if new value is greater than max seen so far
-          if (v > myagg.max) {
-            myagg.max = v;
-          }
+        //Update null counter if a null value is seen
+        if (p == null) {
+          myagg.countNulls++;
+        }
+        else {
+          try {
 
-          // Add value to NumDistinctValue Estimator
-          myagg.numDV.addToEstimator(v);
+            double v = PrimitiveObjectInspectorUtils.getDouble(p, inputOI);
 
-        } catch (NumberFormatException e) {
-          if (!warned) {
-            warned = true;
-            LOG.warn(getClass().getSimpleName() + " "
-                + StringUtils.stringifyException(e));
-            LOG.warn(getClass().getSimpleName()
-                + " ignoring similar exceptions.");
+            //Update min counter if new value is less than min seen so far
+            if (v < myagg.min) {
+              myagg.min = v;
+            }
+
+            //Update max counter if new value is greater than max seen so far
+            if (v > myagg.max) {
+              myagg.max = v;
+            }
+
+            // Add value to NumDistinctValue Estimator
+            myagg.numDV.addToEstimator(v);
+
+          } catch (NumberFormatException e) {
+            if (!warned) {
+              warned = true;
+              LOG.warn(getClass().getSimpleName() + " "
+                  + StringUtils.stringifyException(e));
+              LOG.warn(getClass().getSimpleName()
+                  + " ignoring similar exceptions.");
+            }
           }
         }
       }
@@ -870,7 +905,11 @@ public class GenericUDAFComputeStats ext
     @Override
     public Object terminate(AggregationBuffer agg) throws HiveException {
       DoubleStatsAgg myagg = (DoubleStatsAgg) agg;
-      long numDV = myagg.numDV.estimateNumDistinctValues();
+      long numDV = 0;
+
+      if (myagg.numBitVectors != 0) {
+        numDV = myagg.numDV.estimateNumDistinctValues();
+      }
 
       // Serialize the result struct
       ((Text) result[0]).set(myagg.columnType);
@@ -1082,44 +1121,56 @@ public class GenericUDAFComputeStats ext
     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
       Object p = parameters[0];
       StringStatsAgg myagg = (StringStatsAgg) agg;
+      boolean emptyTable = false;
+
+      if (parameters[1] == null) {
+        emptyTable = true;
+      }
 
       if (myagg.firstItem) {
-        int numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        int numVectors = 0;
+        if (!emptyTable) {
+          numVectors = PrimitiveObjectInspectorUtils.getInt(parameters[1], numVectorsOI);
+        }
         initNDVEstimator(myagg, numVectors);
         myagg.firstItem = false;
         myagg.numBitVectors = numVectors;
       }
 
-      // Update null counter if a null value is seen
-      if (p == null) {
-        myagg.countNulls++;
-      }
-      else {
-        try {
-          String v = PrimitiveObjectInspectorUtils.getString(p, inputOI);
-
-          // Update max length if new length is greater than the ones seen so far
-          int len = v.length();
-          if (len > myagg.maxLength) {
-            myagg.maxLength = len;
-          }
-
-          // Update sum length with the new length
-          myagg.sumLength += len;
+      if (!emptyTable) {
 
-          // Increment count of values seen so far
-          myagg.count++;
+        // Update null counter if a null value is seen
+        if (p == null) {
+          myagg.countNulls++;
+        }
+        else {
+          try {
 
-          // Add string value to NumDistinctValue Estimator
-          myagg.numDV.addToEstimator(v);
+            String v = PrimitiveObjectInspectorUtils.getString(p, inputOI);
 
-        } catch (NumberFormatException e) {
-          if (!warned) {
-            warned = true;
-            LOG.warn(getClass().getSimpleName() + " "
-                + StringUtils.stringifyException(e));
-            LOG.warn(getClass().getSimpleName()
-                + " ignoring similar exceptions.");
+            // Update max length if new length is greater than the ones seen so far
+            int len = v.length();
+            if (len > myagg.maxLength) {
+              myagg.maxLength = len;
+            }
+
+            // Update sum length with the new length
+            myagg.sumLength += len;
+
+            // Increment count of values seen so far
+            myagg.count++;
+
+            // Add string value to NumDistinctValue Estimator
+            myagg.numDV.addToEstimator(v);
+
+          } catch (NumberFormatException e) {
+            if (!warned) {
+              warned = true;
+              LOG.warn(getClass().getSimpleName() + " "
+                  + StringUtils.stringifyException(e));
+              LOG.warn(getClass().getSimpleName()
+                  + " ignoring similar exceptions.");
+            }
           }
         }
       }
@@ -1186,8 +1237,18 @@ public class GenericUDAFComputeStats ext
     @Override
     public Object terminate(AggregationBuffer agg) throws HiveException {
       StringStatsAgg myagg = (StringStatsAgg) agg;
-      long numDV = myagg.numDV.estimateNumDistinctValues();
-      double avgLength = (double)(myagg.sumLength/(1.0 * (myagg.count + myagg.countNulls)));
+
+      long numDV = 0;
+      double avgLength = 0.0;
+      long total = myagg.count + myagg.countNulls;
+
+      if (myagg.numBitVectors != 0) {
+        numDV = myagg.numDV.estimateNumDistinctValues();
+      }
+
+      if (total != 0) {
+         avgLength = (double)(myagg.sumLength / (1.0 * total));
+      }
 
       // Serialize the result struct
       ((Text) result[0]).set(myagg.columnType);
@@ -1347,34 +1408,41 @@ public class GenericUDAFComputeStats ext
     public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
       Object p = parameters[0];
       BinaryStatsAgg myagg = (BinaryStatsAgg) agg;
+      boolean emptyTable = false;
 
-      // Update null counter if a null value is seen
-      if (p == null) {
-        myagg.countNulls++;
+      if (parameters[1] == null) {
+        emptyTable = true;
       }
-      else {
-        try {
-          BytesWritable v = PrimitiveObjectInspectorUtils.getBinary(p, inputOI);
-
-          // Update max length if new length is greater than the ones seen so far
-          int len = v.getLength();
-          if (len > myagg.maxLength) {
-            myagg.maxLength = len;
-          }
-
-          // Update sum length with the new length
-          myagg.sumLength += len;
 
-          // Increment count of values seen so far
-          myagg.count++;
-
-        } catch (NumberFormatException e) {
-          if (!warned) {
-            warned = true;
-            LOG.warn(getClass().getSimpleName() + " "
-                + StringUtils.stringifyException(e));
-            LOG.warn(getClass().getSimpleName()
-                + " ignoring similar exceptions.");
+      if (!emptyTable) {
+        // Update null counter if a null value is seen
+        if (p == null) {
+          myagg.countNulls++;
+        }
+        else {
+          try {
+            BytesWritable v = PrimitiveObjectInspectorUtils.getBinary(p, inputOI);
+
+            // Update max length if new length is greater than the ones seen so far
+            int len = v.getLength();
+            if (len > myagg.maxLength) {
+              myagg.maxLength = len;
+            }
+
+            // Update sum length with the new length
+            myagg.sumLength += len;
+
+            // Increment count of values seen so far
+            myagg.count++;
+
+          } catch (NumberFormatException e) {
+            if (!warned) {
+              warned = true;
+              LOG.warn(getClass().getSimpleName() + " "
+                  + StringUtils.stringifyException(e));
+              LOG.warn(getClass().getSimpleName()
+                  + " ignoring similar exceptions.");
+            }
           }
         }
       }
@@ -1440,7 +1508,12 @@ public class GenericUDAFComputeStats ext
     @Override
     public Object terminate(AggregationBuffer agg) throws HiveException {
       BinaryStatsAgg myagg = (BinaryStatsAgg) agg;
-      double avgLength = (double)(myagg.sumLength/(1.0 * (myagg.count + myagg.countNulls)));
+      double avgLength = 0.0;
+      long count = myagg.count + myagg.countNulls;
+
+      if (count != 0) {
+        avgLength = (double)(myagg.sumLength / (1.0 * (myagg.count + myagg.countNulls)));
+      }
 
       // Serialize the result struct
       ((Text) result[0]).set(myagg.columnType);