You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/29 17:44:42 UTC

svn commit: r1518680 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: exec/ optimizer/ parse/ plan/ udf/ptf/

Author: hashutosh
Date: Thu Aug 29 15:44:42 2013
New Revision: 1518680

URL: http://svn.apache.org/r1518680
Log:
HIVE-4964 : Cleanup PTF code: remove code dealing with non standard sql behavior we had original introduced (Harish Butani via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Aug 29 15:44:42 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
 	 * 4. Create input partition to store rows coming from previous operator
 	 */
 	@Override
-	protected void initializeOp(Configuration jobConf) throws HiveException
-	{
+	protected void initializeOp(Configuration jobConf) throws HiveException {
 		hiveConf = new HiveConf(jobConf, PTFOperator.class);
 		// if the parent is ExtractOperator, this invocation is from reduce-side
 		Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
     inputPart = createFirstPartitionForChain(
         inputObjInspectors[0], hiveConf, isMapOperator);
 
-		if (isMapOperator)
-		{
+		if (isMapOperator) {
 			PartitionedTableFunctionDef tDef = conf.getStartOfChain();
 			outputObjInspector = tDef.getRawInputShape().getOI();
-		}
-		else
-		{
+		} else {
 			outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
 		}
 
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
 	}
 
 	@Override
-	protected void closeOp(boolean abort) throws HiveException
-	{
+	protected void closeOp(boolean abort) throws HiveException {
 		super.closeOp(abort);
     if(inputPart.size() != 0){
-      if (isMapOperator)
-      {
+      if (isMapOperator) {
         processMapFunction();
-      }
-      else
-      {
+      } else {
         processInputPartition();
       }
     }
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
 	@Override
 	public void processOp(Object row, int tag) throws HiveException
 	{
-	  if (!isMapOperator )
-    {
+	  if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:
        * - If not:
@@ -126,20 +113,15 @@ public class PTFOperator extends Operato
       boolean keysAreEqual = (currentKeys != null && newKeys != null)?
               newKeys.equals(currentKeys) : false;
 
-      if (currentKeys != null && !keysAreEqual)
-      {
+      if (currentKeys != null && !keysAreEqual) {
         processInputPartition();
         inputPart.reset();
       }
 
-      if (currentKeys == null || !keysAreEqual)
-      {
-        if (currentKeys == null)
-        {
+      if (currentKeys == null || !keysAreEqual) {
+        if (currentKeys == null) {
           currentKeys = newKeys.copyKey();
-        }
-        else
-        {
+        } else {
           currentKeys.copyKey(newKeys);
         }
       }
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
 	 * @param hiveConf
 	 * @throws HiveException
 	 */
-	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
-	{
+	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
 
 	  PTFDeserializer dS =
 	      new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
 	  dS.initializePTFChain(conf.getFuncDef());
 	}
 
-	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
-	{
+	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
 		PartitionDef pDef = conf.getStartOfChain().getPartition();
 		ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
 		int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
 		ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
 		ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
 
-		for(int i=0; i<numExprs; i++)
-		{
+		for(int i=0; i<numExprs; i++) {
 		  PTFExpressionDef exprDef = exprs.get(i);
 			/*
 			 * Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
 	  newKeys = keyWrapperFactory.getKeyWrapper();
 	}
 
-	protected void processInputPartition() throws HiveException
-	{
+	protected void processInputPartition() throws HiveException {
 	  PTFPartition outPart = executeChain(inputPart);
-	  if ( conf.forWindowing() ) {
-	    executeWindowExprs(outPart);
-	  }
-	  else {
-	    PTFPartitionIterator<Object> pItr = outPart.iterator();
-	    while (pItr.hasNext())
-	    {
-	      Object oRow = pItr.next();
-	      forward(oRow, outputObjInspector);
-	    }
-	  }
+	  PTFPartitionIterator<Object> pItr = outPart.iterator();
+    while (pItr.hasNext()) {
+      Object oRow = pItr.next();
+      forward(oRow, outputObjInspector);
+    }
 	}
 
-	protected void processMapFunction() throws HiveException
-	{
+	protected void processMapFunction() throws HiveException {
 	  PartitionedTableFunctionDef tDef = conf.getStartOfChain();
     PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
     PTFPartitionIterator<Object> pItr = outPart.iterator();
-    while (pItr.hasNext())
-    {
+    while (pItr.hasNext()) {
       Object oRow = pItr.next();
       forward(oRow, outputObjInspector);
     }
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
 
 
 	@Override
-	public OperatorType getType()
-	{
+	public OperatorType getType() {
 		return OperatorType.PTF;
 	}
 
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   private PTFPartition executeChain(PTFPartition part)
-      throws HiveException
-  {
+      throws HiveException {
     Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
     PTFInputDef iDef = conf.getFuncDef();
-    while (true)
-    {
-      if (iDef instanceof PartitionedTableFunctionDef)
-      {
-        fnDefs.push((PartitionedTableFunctionDef) iDef);
-        iDef = ((PartitionedTableFunctionDef) iDef).getInput();
-      }
-      else
-      {
-        break;
-      }
+
+    while (iDef instanceof PartitionedTableFunctionDef) {
+      fnDefs.push((PartitionedTableFunctionDef) iDef);
+      iDef = ((PartitionedTableFunctionDef) iDef).getInput();
     }
 
     PartitionedTableFunctionDef currFnDef;
-    while (!fnDefs.isEmpty())
-    {
+    while (!fnDefs.isEmpty()) {
       currFnDef = fnDefs.pop();
       part = currFnDef.getTFunction().execute(part);
     }
     return part;
   }
 
-  /**
-   * If WindowingSpec contains any Windowing Expressions or has a
-   * Having condition then these are processed
-   * and forwarded on. Whereas when there is no having or WdwExprs
-   * just forward the rows in the output Partition.
-   *
-   * For e.g. Consider the following query:
-   * <pre>
-   * {@code
-   *  select rank(), lead(rank(),1),...
-   *  from xyz
-   *  ...
-   *  having rank() > 1
-   *  }
-   * </pre>
-   * rank() gets processed as a WdwFn; Its in the oPart(output partition)
-   * argument to executeWindowExprs. Here we first evaluate the having expression.
-   * So the first row of oPart gets filtered out.
-   * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
-   *
-   * @param ptfDesc
-   * @param oPart output partition after Window Fns are processed.
-   * @param op
-   * @throws HiveException
-   */
-  private void executeWindowExprs(PTFPartition oPart)
-      throws HiveException
-  {
-    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
-    /*
-     * inputOI represents the row with WindowFn results present.
-     * So in the e.g. above it will have a column for 'rank()'
-     */
-    StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
-    /*
-     * outputOI represents the final row with the Windowing Expressions added.
-     * So in the e.g. above it will have a column for 'lead(rank())' in addition to
-     * all columns in inputOI.
-     */
-    StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
-    int numCols = outputOI.getAllStructFieldRefs().size();
-    ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
-    int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
-    Object[] output = new Object[numCols];
-
-    /*
-     * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
-     * we can just forward the row in the oPart partition.
-     */
-    boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
-    PTFPartitionIterator<Object> pItr = oPart.iterator();
-    PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
-    while (pItr.hasNext())
-    {
-      int colCnt = 0;
-      Object oRow = pItr.next();
-
-      /*
-       * when there is no Windowing expressions or having;
-       * just forward the Object coming out of the Partition.
-       */
-      if ( forwardRowsUntouched ) {
-        forward(oRow, outputObjInspector);
-        continue;
-      }
-
-      /*
-       * Setup the output row columns in the following order
-       * - the columns in the SelectList processed by the PTF
-       * (ie the Select Exprs that have navigation expressions)
-       * - the columns from the final PTF.
-       */
-
-      if ( wdwExprs != null ) {
-        for (WindowExpressionDef wdwExpr : wdwExprs)
-        {
-          Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
-          output[colCnt++] = newCol;
-        }
-      }
-
-      for(; colCnt < numCols; ) {
-        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
-        output[colCnt++] =
-            ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
-            field.getFieldObjectInspector());
-      }
-
-      forward(output, outputObjInspector);
-    }
-  }
 
   /**
    * Create a new Partition.
@@ -390,8 +258,7 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
-      HiveConf hiveConf, boolean isMapSide) throws HiveException
-  {
+      HiveConf hiveConf, boolean isMapSide) throws HiveException {
     PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
     TableFunctionEvaluator tEval = tabDef.getTFunction();
 
@@ -410,14 +277,12 @@ public class PTFOperator extends Operato
   }
 
   public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
-      PTFPartitionIterator<Object> pItr) throws HiveException
-  {
+      PTFPartitionIterator<Object> pItr) throws HiveException {
     List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
     if (llFnDescs == null) {
       return;
     }
-    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
-    {
+    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
       GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
           .getGenericUDF();
       llFn.setpItr(pItr);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 29 15:44:42 2013
@@ -68,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -223,12 +222,6 @@ public final class ColumnPrunerProcFacto
           }
         }
       }
-      if ( tDef.getWindowExpressions() != null ) {
-        for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
-          ExprNodeDesc exprNode = expr.getExprNode();
-          Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
-        }
-      }
      if(tDef.getPartition() != null){
          for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
            ExprNodeDesc exprNode = col.getExprNode();

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Thu Aug 29 15:44:42 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.RangeBoundaryDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -200,83 +199,26 @@ public class PTFTranslator {
     /*
      * set outputFromWdwFnProcessing
      */
-    if (windowFunctions.size() > 0) {
-      ArrayList<String> aliases = new ArrayList<String>();
-      ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-      for (WindowFunctionDef wFnDef : windowFunctions) {
-        aliases.add(wFnDef.getAlias());
-        if (wFnDef.isPivotResult()) {
-          fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
-        } else {
-          fieldOIs.add(wFnDef.getOI());
-        }
-      }
-      PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
-      StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
-          aliases, fieldOIs);
-      tFn.setWdwProcessingOutputOI(wdwOutOI);
-      RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef, false);
-      ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
-      wdwTFnDef.setOutputFromWdwFnProcessing(wdwOutShape);
-    }
-    else {
-      wdwTFnDef.setOutputFromWdwFnProcessing(inpShape);
-    }
-
-    /*
-     * process Wdw expressions
-     */
-    ShapeDetails wdwOutShape = wdwTFnDef.getOutputFromWdwFnProcessing();
-    ArrayList<WindowExpressionDef> windowExpressions = new ArrayList<WindowExpressionDef>();
-    if (wdwSpec.getWindowExpressions() != null) {
-      for (WindowExpressionSpec expr : wdwSpec.getWindowExpressions()) {
-        if (!(expr instanceof WindowFunctionSpec)) {
-          try {
-            PTFExpressionDef eDef = buildExpressionDef(wdwOutShape, expr.getExpression());
-            WindowExpressionDef wdwEDef = new WindowExpressionDef(eDef);
-            wdwEDef.setAlias(expr.getAlias());
-            windowExpressions.add(wdwEDef);
-          } catch (HiveException he) {
-            throw new SemanticException(he);
-          }
-        }
+    ArrayList<String> aliases = new ArrayList<String>();
+    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+    for (WindowFunctionDef wFnDef : windowFunctions) {
+      aliases.add(wFnDef.getAlias());
+      if (wFnDef.isPivotResult()) {
+        fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+      } else {
+        fieldOIs.add(wFnDef.getOI());
       }
-      wdwTFnDef.setWindowExpressions(windowExpressions);
-    }
-
-    /*
-     * set outputOI
-     */
-    if (windowExpressions.size() > 0) {
-      ArrayList<String> aliases = new ArrayList<String>();
-      ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-      for (WindowExpressionDef wEDef : windowExpressions) {
-        aliases.add(wEDef.getAlias());
-        fieldOIs.add(wEDef.getOI());
-      }
-      PTFTranslator.addInputColumnsToList(wdwOutShape, aliases, fieldOIs);
-      StructObjectInspector outOI = ObjectInspectorFactory.getStandardStructObjectInspector(
-          aliases, fieldOIs);
-      RowResolver outRR = buildRowResolverForWindowing(wdwTFnDef, true);
-      ShapeDetails outShape = setupShape(outOI, null, outRR);
-      wdwTFnDef.setOutputShape(outShape);
-    }
-    else {
-      wdwTFnDef.setOutputShape(copyShape(wdwOutShape));
     }
+    PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
+    StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+        aliases, fieldOIs);
+    tFn.setWdwProcessingOutputOI(wdwOutOI);
+    RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef);
+    ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
+    wdwTFnDef.setOutputShape(wdwOutShape);
 
     tFn.setupOutputOI();
 
-    /*
-     * If we have windowExpressions then we convert to Std. Object to process;
-     * we just stream these rows; no need to put in an output Partition.
-     */
-    if (windowExpressions.size() > 0) {
-      StructObjectInspector oi = (StructObjectInspector)
-          ObjectInspectorUtils.getStandardObjectInspector(wdwTFnDef.getOutputShape().getOI());
-      wdwTFnDef.getOutputShape().setOI(oi);
-    }
-
     return ptfDesc;
   }
 
@@ -949,23 +891,10 @@ public class PTFTranslator {
     return rwsch;
   }
 
-  protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def,
-      boolean addWdwExprs) throws SemanticException {
+  protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def)
+      throws SemanticException {
     RowResolver rr = new RowResolver();
     HashMap<String, WindowExpressionSpec> aliasToExprMap = windowingSpec.getAliasToWdwExpr();
-    /*
-     * add Window Expressions
-     */
-    if (addWdwExprs) {
-      for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
-        ASTNode ast = aliasToExprMap.get(wEDef.getAlias()).getExpression();
-        ColumnInfo cInfo = new ColumnInfo(wEDef.getAlias(),
-            TypeInfoUtils.getTypeInfoFromObjectInspector(wEDef.getOI()),
-            null,
-            false);
-        rr.putExpression(ast, cInfo);
-      }
-    }
 
     /*
      * add Window Functions

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Thu Aug 29 15:44:42 2013
@@ -67,19 +67,6 @@ public class WindowingSpec {
     windowSpecs.put(name, wdwSpec);
   }
 
-  public void addExpression(ASTNode expr, String alias) {
-    windowExpressions = windowExpressions == null ?
-        new ArrayList<WindowExpressionSpec>() : windowExpressions;
-    aliasToWdwExpr = aliasToWdwExpr == null ?
-        new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
-    WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
-    wExprSpec.setAlias(alias);
-    wExprSpec.setExpression(expr);
-
-    windowExpressions.add(wExprSpec);
-    aliasToWdwExpr.put(alias, wExprSpec);
-  }
-
   public void addWindowFunction(WindowFunctionSpec wFn) {
     windowExpressions = windowExpressions == null ?
         new ArrayList<WindowExpressionSpec>() : windowExpressions;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java Thu Aug 29 15:44:42 2013
@@ -239,28 +239,8 @@ public class PTFDesc extends AbstractOpe
   }
 
   public static class WindowTableFunctionDef extends PartitionedTableFunctionDef {
-    ArrayList<WindowExpressionDef> windowExpressions;
     ArrayList<WindowFunctionDef> windowFunctions;
-    /*
-     * this shape omits the non WdwFunction Expressions. Expr Evaluators for the Window Expressions is based on this
-     * shape, so they can refer to the Wdw Function values.
-     * @note: this will eventually be removed, as plan is to push Wdw expression processing to separate Select Op after
-     * PTF Op.
-     */
-    ShapeDetails outputFromWdwFnProcessing;
-
-    public ArrayList<WindowExpressionDef> getWindowExpressions() {
-      return windowExpressions;
-    }
-    public void setWindowExpressions(ArrayList<WindowExpressionDef> windowExpressions) {
-      this.windowExpressions = windowExpressions;
-    }
-    public ShapeDetails getOutputFromWdwFnProcessing() {
-      return outputFromWdwFnProcessing;
-    }
-    public void setOutputFromWdwFnProcessing(ShapeDetails outputFromWdwFnProcessing) {
-      this.outputFromWdwFnProcessing = outputFromWdwFnProcessing;
-    }
+
     public ArrayList<WindowFunctionDef> getWindowFunctions() {
       return windowFunctions;
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDeserializer.java Thu Aug 29 15:44:42 2013
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -53,7 +52,6 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
@@ -114,67 +112,37 @@ public class PTFDeserializer {
     /*
      * 2. initialize WFns.
      */
-    if (def.getWindowFunctions() != null) {
-      for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
-
-        if (wFnDef.getArgs() != null) {
-          for (PTFExpressionDef arg : wFnDef.getArgs()) {
-            initialize(arg, inpShape);
-          }
+    for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
+      if (wFnDef.getArgs() != null) {
+        for (PTFExpressionDef arg : wFnDef.getArgs()) {
+          initialize(arg, inpShape);
         }
 
-        if (wFnDef.getWindowFrame() != null) {
-          WindowFrameDef wFrmDef = wFnDef.getWindowFrame();
-          initialize(wFrmDef.getStart(), inpShape);
-          initialize(wFrmDef.getEnd(), inpShape);
-        }
-        setupWdwFnEvaluator(wFnDef);
       }
-      ArrayList<String> aliases = new ArrayList<String>();
-      ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-      for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
-        aliases.add(wFnDef.getAlias());
-        if (wFnDef.isPivotResult()) {
-          fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
-        } else {
-          fieldOIs.add(wFnDef.getOI());
-        }
+      if (wFnDef.getWindowFrame() != null) {
+        WindowFrameDef wFrmDef = wFnDef.getWindowFrame();
+        initialize(wFrmDef.getStart(), inpShape);
+        initialize(wFrmDef.getEnd(), inpShape);
       }
-      PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs);
-      StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
-          aliases, fieldOIs);
-      tResolver.setWdwProcessingOutputOI(wdwOutOI);
-      initialize(def.getOutputFromWdwFnProcessing(), wdwOutOI);
-    } else {
-      def.setOutputFromWdwFnProcessing(inpShape);
+      setupWdwFnEvaluator(wFnDef);
     }
-
-    inpShape = def.getOutputFromWdwFnProcessing();
-
-    /*
-     * 3. initialize WExprs. + having clause
-     */
-    if (def.getWindowExpressions() != null) {
-      for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
-        initialize(wEDef, inpShape);
+    ArrayList<String> aliases = new ArrayList<String>();
+    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+    for (WindowFunctionDef wFnDef : def.getWindowFunctions()) {
+      aliases.add(wFnDef.getAlias());
+      if (wFnDef.isPivotResult()) {
+        fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+      } else {
+        fieldOIs.add(wFnDef.getOI());
       }
     }
 
-    /*
-     * 4. give Evaluator chance to setup for Output execution; setup Output shape.
-     */
+    PTFDeserializer.addInputColumnsToList(inpShape, aliases, fieldOIs);
+    StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+        aliases, fieldOIs);
+    tResolver.setWdwProcessingOutputOI(wdwOutOI);
+    initialize(def.getOutputShape(), wdwOutOI);
     tResolver.initializeOutputOI();
-    initialize(def.getOutputShape(), tEval.getOutputOI());
-
-    /*
-     * If we have windowExpressions then we convert to Std. Object to process;
-     * we just stream these rows; no need to put in an output Partition.
-     */
-    if (def.getWindowExpressions().size() > 0) {
-      StructObjectInspector oi = (StructObjectInspector)
-          ObjectInspectorUtils.getStandardObjectInspector(def.getOutputShape().getOI());
-      def.getOutputShape().setOI(oi);
-    }
   }
 
   protected void initialize(PTFQueryInputDef def, StructObjectInspector OI) throws HiveException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java?rev=1518680&r1=1518679&r2=1518680&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java Thu Aug 29 15:44:42 2013
@@ -46,34 +46,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 
-public class WindowingTableFunction extends TableFunctionEvaluator
-{
-
-  @Override
-  public PTFPartition execute(PTFPartition iPart)
-      throws HiveException
-  {
-    WindowTableFunctionDef wFnDef = (WindowTableFunctionDef) getTableDef();
-    PTFPartitionIterator<Object> pItr = iPart.iterator();
-    PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, pItr);
-
-    if ( outputPartition == null ) {
-      outputPartition = PTFPartition.create(ptfDesc.getCfg(),
-          wFnDef.getOutputFromWdwFnProcessing().getSerde(),
-          OI, wFnDef.getOutputFromWdwFnProcessing().getOI());
-    }
-    else {
-      outputPartition.reset();
-    }
-
-    execute(pItr, outputPartition);
-    return outputPartition;
-  }
+@SuppressWarnings("deprecation")
+public class WindowingTableFunction extends TableFunctionEvaluator {
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Override
-  public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException
-  {
+  public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
     ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
     PTFPartition iPart = pItr.getPartition();
     StructObjectInspector inputOI;
@@ -82,36 +60,29 @@ public class WindowingTableFunction exte
     WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
     Order order = wTFnDef.getOrder().getExpressions().get(0).getOrder();
 
-    for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions())
-    {
+    for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
       boolean processWindow = processWindow(wFn);
       pItr.reset();
-      if ( !processWindow )
-      {
+      if ( !processWindow ) {
         GenericUDAFEvaluator fEval = wFn.getWFnEval();
         Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
         AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
-        while(pItr.hasNext())
-        {
+        while(pItr.hasNext()) {
           Object row = pItr.next();
           int i =0;
           if ( wFn.getArgs() != null ) {
-            for(PTFExpressionDef arg : wFn.getArgs())
-            {
+            for(PTFExpressionDef arg : wFn.getArgs()) {
               args[i++] = arg.getExprEvaluator().evaluate(row);
             }
           }
           fEval.aggregate(aggBuffer, args);
         }
         Object out = fEval.evaluate(aggBuffer);
-        if ( !wFn.isPivotResult())
-        {
+        if ( !wFn.isPivotResult()) {
           out = new SameList(iPart.size(), out);
         }
         oColumns.add((List<?>)out);
-      }
-      else
-      {
+      } else {
         oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart, order));
       }
     }
@@ -122,18 +93,15 @@ public class WindowingTableFunction exte
      * - the input Rows columns
      */
 
-    for(int i=0; i < iPart.size(); i++)
-    {
+    for(int i=0; i < iPart.size(); i++) {
       ArrayList oRow = new ArrayList();
       Object iRow = iPart.getAt(i);
 
-      for(int j=0; j < oColumns.size(); j++)
-      {
+      for(int j=0; j < oColumns.size(); j++) {
         oRow.add(oColumns.get(j).get(i));
       }
 
-      for(StructField f : inputOI.getAllStructFieldRefs())
-      {
+      for(StructField f : inputOI.getAllStructFieldRefs()) {
         oRow.add(inputOI.getStructFieldData(iRow, f));
       }
 
@@ -187,15 +155,13 @@ public class WindowingTableFunction exte
      * - the Window Functions.
      */
     @Override
-    public void initializeOutputOI() throws HiveException
-    {
+    public void initializeOutputOI() throws HiveException {
       setupOutputOI();
     }
 
 
     @Override
-    public boolean transformsRawInput()
-    {
+    public boolean transformsRawInput() {
       return false;
     }
 
@@ -225,26 +191,22 @@ public class WindowingTableFunction exte
       WindowFunctionDef wFnDef,
       PTFPartition iPart,
       Order order)
-    throws HiveException
-  {
+    throws HiveException {
     ArrayList<Object> vals = new ArrayList<Object>();
 
     GenericUDAFEvaluator fEval = wFnDef.getWFnEval();
 
     Object[] args = new Object[wFnDef.getArgs() == null ? 0 : wFnDef.getArgs().size()];
-    for(int i=0; i < iPart.size(); i++)
-    {
+    for(int i=0; i < iPart.size(); i++) {
       AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
       Range rng = getRange(wFnDef, i, iPart, order);
       PTFPartitionIterator<Object> rItr = rng.iterator();
       PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-      while(rItr.hasNext())
-      {
+      while(rItr.hasNext()) {
         Object row = rItr.next();
         int j = 0;
         if ( wFnDef.getArgs() != null ) {
-          for(PTFExpressionDef arg : wFnDef.getArgs())
-          {
+          for(PTFExpressionDef arg : wFnDef.getArgs()) {
             args[j++] = arg.getExprEvaluator().evaluate(row);
           }
         }
@@ -701,8 +663,7 @@ public class WindowingTableFunction exte
       }
     }
 
-    public Object computeValue(Object row) throws HiveException
-    {
+    public Object computeValue(Object row) throws HiveException {
       Object o = expressionDef.getExprEvaluator().evaluate(row);
       return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI());
     }
@@ -713,11 +674,10 @@ public class WindowingTableFunction exte
 
 
     @SuppressWarnings("incomplete-switch")
-    public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order) throws HiveException
-    {
+    public static ValueBoundaryScanner getScanner(ValueBoundaryDef vbDef, Order order)
+        throws HiveException {
       PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) vbDef.getOI();
-      switch(pOI.getPrimitiveCategory())
-      {
+      switch(pOI.getPrimitiveCategory()) {
       case BYTE:
       case INT:
       case LONG:
@@ -736,16 +696,14 @@ public class WindowingTableFunction exte
     }
   }
 
-  public static class LongValueBoundaryScanner extends ValueBoundaryScanner
-  {
-    public LongValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
-    {
+  public static class LongValueBoundaryScanner extends ValueBoundaryScanner {
+    public LongValueBoundaryScanner(BoundaryDef bndDef, Order order,
+        PTFExpressionDef expressionDef) {
       super(bndDef,order,expressionDef);
     }
 
     @Override
-    public boolean isGreater(Object v1, Object v2, int amt)
-    {
+    public boolean isGreater(Object v1, Object v2, int amt) {
       long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
@@ -754,8 +712,7 @@ public class WindowingTableFunction exte
     }
 
     @Override
-    public boolean isEqual(Object v1, Object v2)
-    {
+    public boolean isEqual(Object v1, Object v2) {
       long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
@@ -764,16 +721,14 @@ public class WindowingTableFunction exte
     }
   }
 
-  public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner
-  {
-    public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
-    {
+  public static class DoubleValueBoundaryScanner extends ValueBoundaryScanner {
+    public DoubleValueBoundaryScanner(BoundaryDef bndDef, Order order,
+        PTFExpressionDef expressionDef) {
       super(bndDef,order,expressionDef);
     }
 
     @Override
-    public boolean isGreater(Object v1, Object v2, int amt)
-    {
+    public boolean isGreater(Object v1, Object v2, int amt) {
       double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
@@ -782,8 +737,7 @@ public class WindowingTableFunction exte
     }
 
     @Override
-    public boolean isEqual(Object v1, Object v2)
-    {
+    public boolean isEqual(Object v1, Object v2) {
       double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
@@ -792,16 +746,14 @@ public class WindowingTableFunction exte
     }
   }
 
-  public static class StringValueBoundaryScanner extends ValueBoundaryScanner
-  {
-    public StringValueBoundaryScanner(BoundaryDef bndDef, Order order, PTFExpressionDef expressionDef)
-    {
+  public static class StringValueBoundaryScanner extends ValueBoundaryScanner {
+    public StringValueBoundaryScanner(BoundaryDef bndDef, Order order,
+        PTFExpressionDef expressionDef) {
       super(bndDef,order,expressionDef);
     }
 
     @Override
-    public boolean isGreater(Object v1, Object v2, int amt)
-    {
+    public boolean isGreater(Object v1, Object v2, int amt) {
       String s1 = PrimitiveObjectInspectorUtils.getString(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       String s2 = PrimitiveObjectInspectorUtils.getString(v2,
@@ -810,8 +762,7 @@ public class WindowingTableFunction exte
     }
 
     @Override
-    public boolean isEqual(Object v1, Object v2)
-    {
+    public boolean isEqual(Object v1, Object v2) {
       String s1 = PrimitiveObjectInspectorUtils.getString(v1,
           (PrimitiveObjectInspector) expressionDef.getOI());
       String s2 = PrimitiveObjectInspectorUtils.getString(v2,
@@ -820,26 +771,22 @@ public class WindowingTableFunction exte
     }
   }
 
-  public static class SameList<E> extends AbstractList<E>
-  {
+  public static class SameList<E> extends AbstractList<E> {
     int sz;
     E val;
 
-    public SameList(int sz, E val)
-    {
+    public SameList(int sz, E val) {
       this.sz = sz;
       this.val = val;
     }
 
     @Override
-    public E get(int index)
-    {
+    public E get(int index) {
       return val;
     }
 
     @Override
-    public int size()
-    {
+    public int size() {
       return sz;
     }