You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2017/01/12 14:25:56 UTC

[12/13] hive git commit: HIVE-15520: Improve the sum performance for Range based window (Aihua Xu, reviewed by Yongzhi Chen)

http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 2fdb492..86783be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.udf.ptf;
 
 import java.util.AbstractList;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -28,25 +27,19 @@ import java.util.Map;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.PTFPartition;
 import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
 import org.apache.hadoop.hive.ql.exec.PTFRollingPartition;
 import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
 import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
-import org.apache.hadoop.hive.ql.plan.ptf.OrderDef;
-import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
@@ -61,7 +54,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,16 +104,16 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
 
     WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
     for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
-      boolean processWindow = processWindow(wFn);
+      boolean processWindow = processWindow(wFn.getWindowFrame());
       pItr.reset();
       if ( !processWindow ) {
-        Object out = evaluateWindowFunction(wFn, pItr);
+        Object out = evaluateFunctionOnPartition(wFn, iPart);
         if ( !wFn.isPivotResult()) {
           out = new SameList(iPart.size(), out);
         }
         oColumns.add((List<?>)out);
       } else {
-        oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart));
+        oColumns.add(executeFnwithWindow(wFn, iPart));
       }
     }
 
@@ -147,30 +139,36 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
     }
   }
 
-  Object evaluateWindowFunction(WindowFunctionDef wFn,
-      PTFPartitionIterator<Object> pItr) throws HiveException {
-    GenericUDAFEvaluator fEval = wFn.getWFnEval();
-    Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
-    AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
-    while(pItr.hasNext())
-    {
-      Object row = pItr.next();
-      int i =0;
-      if ( wFn.getArgs() != null ) {
-        for(PTFExpressionDef arg : wFn.getArgs())
-        {
-          args[i++] = arg.getExprEvaluator().evaluate(row);
-        }
-      }
-      fEval.aggregate(aggBuffer, args);
+  // Evaluate the result given a partition and the row number to process
+  private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
+      throws HiveException {
+    BasePartitionEvaluator partitionEval = wFn.getWFnEval()
+        .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI());
+    return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
+  }
+
+  // Evaluate the result given a partition
+  private Object evaluateFunctionOnPartition(WindowFunctionDef wFn,
+      PTFPartition partition) throws HiveException {
+    BasePartitionEvaluator partitionEval = wFn.getWFnEval()
+        .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI());
+    return partitionEval.getPartitionAgg();
+  }
+
+  // Evaluate the function result for each row in the partition
+  ArrayList<Object> executeFnwithWindow(
+      WindowFunctionDef wFnDef,
+      PTFPartition iPart)
+    throws HiveException {
+    ArrayList<Object> vals = new ArrayList<Object>();
+    for(int i=0; i < iPart.size(); i++) {
+      Object out = evaluateWindowFunction(wFnDef, i, iPart);
+      vals.add(out);
     }
-    Object out = fEval.evaluate(aggBuffer);
-    out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
-    return out;
+    return vals;
   }
 
-  private boolean processWindow(WindowFunctionDef wFn) {
-    WindowFrameDef frame = wFn.getWindowFrame();
+  private static boolean processWindow(WindowFrameDef frame) {
     if ( frame == null ) {
       return false;
     }
@@ -391,7 +389,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
 
     streamingState.rollingPart.append(row);
 
-    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+    WindowTableFunctionDef tabDef = (WindowTableFunctionDef) tableDef;
 
     for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
       WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
@@ -417,10 +415,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
       } else {
         int rowToProcess = streamingState.rollingPart.rowToProcess(wFn.getWindowFrame());
         if (rowToProcess >= 0) {
-          Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart);
-          PTFPartitionIterator<Object> rItr = rng.iterator();
-          PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-          Object out = evaluateWindowFunction(wFn, rItr);
+          Object out =  evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart);
           streamingState.fnOutputs[i].add(out);
         }
       }
@@ -495,10 +490,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
         while (numRowsRemaining > 0) {
           int rowToProcess = streamingState.rollingPart.size() - numRowsRemaining;
           if (rowToProcess >= 0) {
-            Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart);
-            PTFPartitionIterator<Object> rItr = rng.iterator();
-            PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-            Object out = evaluateWindowFunction(wFn, rItr);
+            Object out = evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart);
             streamingState.fnOutputs[i].add(out);
           }
           numRowsRemaining--;
@@ -540,10 +532,10 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
 
     int i=0;
     for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
-      boolean processWindow = processWindow(wFn);
+      boolean processWindow = processWindow(wFn.getWindowFrame());
       pItr.reset();
       if ( !processWindow && !wFn.isPivotResult() ) {
-        Object out = evaluateWindowFunction(wFn, pItr);
+        Object out = evaluateFunctionOnPartition(wFn, iPart);
         output.add(out);
       } else if (wFn.isPivotResult()) {
         GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame());
@@ -558,12 +550,11 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
             output.add(null);
             wFnsWithWindows.add(i);
           } else {
-            outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn,
-                pItr);
+            outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart);
             output.add(null);
           }
         } else {
-          outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+          outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart);
           output.add(null);
         }
       } else {
@@ -652,797 +643,6 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
 
   }
 
-  ArrayList<Object> executeFnwithWindow(PTFDesc ptfDesc,
-      WindowFunctionDef wFnDef,
-      PTFPartition iPart)
-    throws HiveException {
-    ArrayList<Object> vals = new ArrayList<Object>();
-    for(int i=0; i < iPart.size(); i++) {
-      Range rng = getRange(wFnDef, i, iPart);
-      PTFPartitionIterator<Object> rItr = rng.iterator();
-      PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-      Object out = evaluateWindowFunction(wFnDef, rItr);
-      vals.add(out);
-    }
-    return vals;
-  }
-
-  private Range getRange(WindowFunctionDef wFnDef, int currRow, PTFPartition p) throws HiveException
-  {
-    WindowFrameDef winFrame = wFnDef.getWindowFrame();
-    BoundaryDef startB = winFrame.getStart();
-    BoundaryDef endB = winFrame.getEnd();
-
-    int start, end;
-    if (winFrame.getWindowType() == WindowType.ROWS) {
-      start = getRowBoundaryStart(startB, currRow);
-      end = getRowBoundaryEnd(endB, currRow, p);
-    } else {
-      ValueBoundaryScanner vbs = ValueBoundaryScanner.getScanner(winFrame);
-      start = vbs.computeStart(currRow, p);
-      end = vbs.computeEnd(currRow, p);
-    }
-    start = start < 0 ? 0 : start;
-    end = end > p.size() ? p.size() : end;
-    return new Range(start, end, p);
-  }
-
-  private int getRowBoundaryStart(BoundaryDef b, int currRow) throws HiveException {
-    Direction d = b.getDirection();
-    int amt = b.getAmt();
-    switch(d) {
-    case PRECEDING:
-      if (amt == BoundarySpec.UNBOUNDED_AMOUNT) {
-        return 0;
-      }
-      else {
-        return currRow - amt;
-      }
-    case CURRENT:
-      return currRow;
-    case FOLLOWING:
-      return currRow + amt;
-    }
-    throw new HiveException("Unknown Start Boundary Direction: " + d);
-  }
-
-  private int getRowBoundaryEnd(BoundaryDef b, int currRow, PTFPartition p) throws HiveException {
-    Direction d = b.getDirection();
-    int amt = b.getAmt();
-    switch(d) {
-    case PRECEDING:
-      if ( amt == 0 ) {
-        return currRow + 1;
-      }
-      return currRow - amt + 1;
-    case CURRENT:
-      return currRow + 1;
-    case FOLLOWING:
-      if (amt == BoundarySpec.UNBOUNDED_AMOUNT) {
-        return p.size();
-      }
-      else {
-        return currRow + amt + 1;
-      }
-    }
-    throw new HiveException("Unknown End Boundary Direction: " + d);
-  }
-
-  static class Range
-  {
-    int start;
-    int end;
-    PTFPartition p;
-
-    public Range(int start, int end, PTFPartition p)
-    {
-      super();
-      this.start = start;
-      this.end = end;
-      this.p = p;
-    }
-
-    public PTFPartitionIterator<Object> iterator()
-    {
-      return p.range(start, end);
-    }
-  }
-
-
-  static abstract class ValueBoundaryScanner {
-    BoundaryDef start, end;
-
-    public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end) {
-      this.start = start;
-      this.end = end;
-    }
-
-    protected abstract int computeStart(int rowIdx, PTFPartition p) throws HiveException;
-
-    protected abstract int computeEnd(int rowIdx, PTFPartition p) throws HiveException;
-
-    public static ValueBoundaryScanner getScanner(WindowFrameDef winFrameDef)
-        throws HiveException {
-      OrderDef orderDef = winFrameDef.getOrderDef();
-      int numOrders = orderDef.getExpressions().size();
-      if (numOrders != 1) {
-        return new MultiValueBoundaryScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef);
-      } else {
-        return SingleValueBoundaryScanner.getScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef);
-      }
-    }
-  }
-
-  /*
-   * - starting from the given rowIdx scan in the given direction until a row's expr
-   * evaluates to an amt that crosses the 'amt' threshold specified in the BoundaryDef.
-   */
-  static abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
-    OrderExpressionDef expressionDef;
-
-    public SingleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end);
-      this.expressionDef = expressionDef;
-    }
-
-    /*
-|  Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior                          |
-| Case |                |                |          |       |                                   |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-|   1. | PRECEDING      | UNB            | ANY      | ANY   | start = 0                         |
-|   2. | PRECEDING      | unsigned int   | NULL     | ASC   | start = 0                         |
-|   3. |                |                |          | DESC  | scan backwards to row R2          |
-|      |                |                |          |       | such that R2.sk is not null       |
-|      |                |                |          |       | start = R2.idx + 1                |
-|   4. | PRECEDING      | unsigned int   | not NULL | DESC  | scan backwards until row R2       |
-|      |                |                |          |       | such that R2.sk - R.sk > amt      |
-|      |                |                |          |       | start = R2.idx + 1                |
-|   5. | PRECEDING      | unsigned int   | not NULL | ASC   | scan backward until row R2        |
-|      |                |                |          |       | such that R.sk - R2.sk > bnd1.amt |
-|      |                |                |          |       | start = R2.idx + 1                |
-|   6. | CURRENT ROW    |                | NULL     | ANY   | scan backwards until row R2       |
-|      |                |                |          |       | such that R2.sk is not null       |
-|      |                |                |          |       | start = R2.idx + 1                |
-|   7. | CURRENT ROW    |                | not NULL | ANY   | scan backwards until row R2       |
-|      |                |                |          |       | such R2.sk != R.sk                |
-|      |                |                |          |       | start = R2.idx + 1                |
-|   8. | FOLLOWING      | UNB            | ANY      | ANY   | Error                             |
-|   9. | FOLLOWING      | unsigned int   | NULL     | DESC  | start = partition.size            |
-|  10. |                |                |          | ASC   | scan forward until R2             |
-|      |                |                |          |       | such that R2.sk is not null       |
-|      |                |                |          |       | start = R2.idx                    |
-|  11. | FOLLOWING      | unsigned int   | not NULL | DESC  | scan forward until row R2         |
-|      |                |                |          |       | such that R.sk - R2.sk > amt      |
-|      |                |                |          |       | start = R2.idx                    |
-|  12. |                |                |          | ASC   | scan forward until row R2         |
-|      |                |                |          |       | such that R2.sk - R.sk > amt      |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-     */
-    @Override
-    protected int computeStart(int rowIdx, PTFPartition p) throws HiveException {
-      switch(start.getDirection()) {
-      case PRECEDING:
-        return computeStartPreceding(rowIdx, p);
-      case CURRENT:
-        return computeStartCurrentRow(rowIdx, p);
-      case FOLLOWING:
-        default:
-          return computeStartFollowing(rowIdx, p);
-      }
-    }
-
-    protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = start.getAmt();
-      // Use Case 1.
-      if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
-        return 0;
-      }
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      if ( sortKey == null ) {
-        // Use Case 2.
-        if ( expressionDef.getOrder() == Order.ASC ) {
-          return 0;
-        }
-        else { // Use Case 3.
-          while ( sortKey == null && rowIdx >= 0 ) {
-            --rowIdx;
-            if ( rowIdx >= 0 ) {
-              sortKey = computeValue(p.getAt(rowIdx));
-            }
-          }
-          return rowIdx+1;
-        }
-      }
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      // Use Case 4.
-      if ( expressionDef.getOrder() == Order.DESC ) {
-        while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
-          r--;
-          if ( r >= 0 ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r + 1;
-      }
-      else { // Use Case 5.
-        while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
-          r--;
-          if ( r >= 0 ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r + 1;
-      }
-    }
-
-    protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      // Use Case 6.
-      if ( sortKey == null ) {
-        while ( sortKey == null && rowIdx >= 0 ) {
-          --rowIdx;
-          if ( rowIdx >= 0 ) {
-            sortKey = computeValue(p.getAt(rowIdx));
-          }
-        }
-        return rowIdx+1;
-      }
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      // Use Case 7.
-      while (r >= 0 && isEqual(rowVal, sortKey) ) {
-        r--;
-        if ( r >= 0 ) {
-          rowVal = computeValue(p.getAt(r));
-        }
-      }
-      return r + 1;
-    }
-
-    protected int computeStartFollowing(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = start.getAmt();
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      if ( sortKey == null ) {
-        // Use Case 9.
-        if ( expressionDef.getOrder() == Order.DESC) {
-          return p.size();
-        }
-        else { // Use Case 10.
-          while (r < p.size() && rowVal == null ) {
-            r++;
-            if ( r < p.size() ) {
-              rowVal = computeValue(p.getAt(r));
-            }
-          }
-          return r;
-        }
-      }
-
-      // Use Case 11.
-      if ( expressionDef.getOrder() == Order.DESC) {
-        while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
-          r++;
-          if ( r < p.size() ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r;
-      }
-      else { // Use Case 12.
-        while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
-          r++;
-          if ( r < p.size() ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r;
-      }
-    }
-
-    /*
-|  Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior                          |
-| Case |                |               |          |       |                                   |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-|   1. | PRECEDING      | UNB           | ANY      | ANY   | Error                             |
-|   2. | PRECEDING      | unsigned int  | NULL     | DESC  | end = partition.size()            |
-|   3. |                |               |          | ASC   | end = 0                           |
-|   4. | PRECEDING      | unsigned int  | not null | DESC  | scan backward until row R2        |
-|      |                |               |          |       | such that R2.sk - R.sk > bnd.amt  |
-|      |                |               |          |       | end = R2.idx + 1                  |
-|   5. | PRECEDING      | unsigned int  | not null | ASC   | scan backward until row R2        |
-|      |                |               |          |       | such that R.sk -  R2.sk > bnd.amt |
-|      |                |               |          |       | end = R2.idx + 1                  |
-|   6. | CURRENT ROW    |               | NULL     | ANY   | scan forward until row R2         |
-|      |                |               |          |       | such that R2.sk is not null       |
-|      |                |               |          |       | end = R2.idx                      |
-|   7. | CURRENT ROW    |               | not null | ANY   | scan forward until row R2         |
-|      |                |               |          |       | such that R2.sk != R.sk           |
-|      |                |               |          |       | end = R2.idx                      |
-|   8. | FOLLOWING      | UNB           | ANY      | ANY   | end = partition.size()            |
-|   9. | FOLLOWING      | unsigned int  | NULL     | DESC  | end = partition.size()            |
-|  10. |                |               |          | ASC   | scan forward until row R2         |
-|      |                |               |          |       | such that R2.sk is not null       |
-|      |                |               |          |       | end = R2.idx                      |
-|  11. | FOLLOWING      | unsigned int  | not NULL | DESC  | scan forward until row R2         |
-|      |                |               |          |       | such R.sk - R2.sk > bnd.amt       |
-|      |                |               |          |       | end = R2.idx                      |
-|  12. |                |               |          | ASC   | scan forward until row R2         |
-|      |                |               |          |       | such R2.sk - R2.sk > bnd.amt      |
-|      |                |               |          |       | end = R2.idx                      |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-     */
-    @Override
-    protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException {
-      switch(end.getDirection()) {
-      case PRECEDING:
-        return computeEndPreceding(rowIdx, p);
-      case CURRENT:
-        return computeEndCurrentRow(rowIdx, p);
-      case FOLLOWING:
-        default:
-          return computeEndFollowing(rowIdx, p);
-      }
-    }
-
-    protected int computeEndPreceding(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = end.getAmt();
-      // Use Case 1.
-      // amt == UNBOUNDED, is caught during translation
-
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      if ( sortKey == null ) {
-        // Use Case 2.
-        if ( expressionDef.getOrder() == Order.DESC ) {
-          return p.size();
-        }
-        else { // Use Case 3.
-          return 0;
-        }
-      }
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      // Use Case 4.
-      if ( expressionDef.getOrder() == Order.DESC ) {
-        while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
-          r--;
-          if ( r >= 0 ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r + 1;
-      }
-      else { // Use Case 5.
-        while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
-          r--;
-          if ( r >= 0 ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r + 1;
-      }
-    }
-
-    protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      // Use Case 6.
-      if ( sortKey == null ) {
-        while ( sortKey == null && rowIdx < p.size() ) {
-          ++rowIdx;
-          if ( rowIdx < p.size() ) {
-            sortKey = computeValue(p.getAt(rowIdx));
-          }
-        }
-        return rowIdx;
-      }
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      // Use Case 7.
-      while (r < p.size() && isEqual(sortKey, rowVal) ) {
-        r++;
-        if ( r < p.size() ) {
-          rowVal = computeValue(p.getAt(r));
-        }
-      }
-      return r;
-    }
-
-    protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = end.getAmt();
-
-      // Use Case 8.
-      if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
-        return p.size();
-      }
-      Object sortKey = computeValue(p.getAt(rowIdx));
-
-      Object rowVal = sortKey;
-      int r = rowIdx;
-
-      if ( sortKey == null ) {
-        // Use Case 9.
-        if ( expressionDef.getOrder() == Order.DESC) {
-          return p.size();
-        }
-        else { // Use Case 10.
-          while (r < p.size() && rowVal == null ) {
-            r++;
-            if ( r < p.size() ) {
-              rowVal = computeValue(p.getAt(r));
-            }
-          }
-          return r;
-        }
-      }
-
-      // Use Case 11.
-      if ( expressionDef.getOrder() == Order.DESC) {
-        while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
-          r++;
-          if ( r < p.size() ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r;
-      }
-      else { // Use Case 12.
-        while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
-          r++;
-          if ( r < p.size() ) {
-            rowVal = computeValue(p.getAt(r));
-          }
-        }
-        return r;
-      }
-    }
-
-    public Object computeValue(Object row) throws HiveException {
-      Object o = expressionDef.getExprEvaluator().evaluate(row);
-      return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI());
-    }
-
-    /**
-     * Checks if the distance of v2 to v1 is greater than the given amt.
-     * @return True if the value of v1 - v2 is greater than amt or either value is null.
-     */
-    public abstract boolean isDistanceGreater(Object v1, Object v2, int amt);
-
-    /**
-     * Checks if the values of v1 or v2 are the same.
-     * @return True if both values are the same or both are nulls.
-     */
-    public abstract boolean isEqual(Object v1, Object v2);
-
-
-    @SuppressWarnings("incomplete-switch")
-    public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef)
-        throws HiveException {
-      if (orderDef.getExpressions().size() != 1) {
-        throw new HiveException("Internal error: initializing SingleValueBoundaryScanner with"
-                + " multiple expression for sorting");
-      }
-      OrderExpressionDef exprDef = orderDef.getExpressions().get(0);
-      PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) exprDef.getOI();
-      switch(pOI.getPrimitiveCategory()) {
-      case BYTE:
-      case INT:
-      case LONG:
-      case SHORT:
-      case TIMESTAMP:
-        return new LongValueBoundaryScanner(start, end, exprDef);
-      case DOUBLE:
-      case FLOAT:
-        return new DoubleValueBoundaryScanner(start, end, exprDef);
-      case DECIMAL:
-        return new HiveDecimalValueBoundaryScanner(start, end, exprDef);
-      case DATE:
-        return new DateValueBoundaryScanner(start, end, exprDef);
-      case STRING:
-        return new StringValueBoundaryScanner(start, end, exprDef);
-      }
-      throw new HiveException(
-          String.format("Internal Error: attempt to setup a Window for datatype %s",
-              pOI.getPrimitiveCategory()));
-    }
-  }
-
-  public static class LongValueBoundaryScanner extends SingleValueBoundaryScanner {
-    public LongValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end,expressionDef);
-    }
-
-    @Override
-    public boolean isDistanceGreater(Object v1, Object v2, int amt) {
-      if (v1 != null && v2 != null) {
-        long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        return (l1 -l2) > amt;
-      }
-
-      return v1 != null || v2 != null; // True if only one value is null
-    }
-
-    @Override
-    public boolean isEqual(Object v1, Object v2) {
-      if (v1 != null && v2 != null) {
-        long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        return l1 == l2;
-      }
-
-      return v1 == null && v2 == null; // True if both are null
-    }
-  }
-
-  public static class DoubleValueBoundaryScanner extends SingleValueBoundaryScanner {
-    public DoubleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end,expressionDef);
-    }
-
-    @Override
-    public boolean isDistanceGreater(Object v1, Object v2, int amt) {
-      if (v1 != null && v2 != null) {
-        double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        return (d1 -d2) > amt;
-      }
-
-      return v1 != null || v2 != null; // True if only one value is null
-    }
-
-    @Override
-    public boolean isEqual(Object v1, Object v2) {
-      if (v1 != null && v2 != null) {
-        double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
-            (PrimitiveObjectInspector) expressionDef.getOI());
-        return d1 == d2;
-      }
-
-      return v1 == null && v2 == null; // True if both are null
-    }
-  }
-
-  public static class HiveDecimalValueBoundaryScanner extends SingleValueBoundaryScanner {
-    public HiveDecimalValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end,expressionDef);
-    }
-
-    @Override
-    public boolean isDistanceGreater(Object v1, Object v2, int amt) {
-      HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      if ( d1 != null && d2 != null ) {
-        return d1.subtract(d2).intValue() > amt;  // TODO: lossy conversion!
-      }
-
-      return d1 != null || d2 != null; // True if only one value is null
-    }
-
-    @Override
-    public boolean isEqual(Object v1, Object v2) {
-      HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      if ( d1 != null && d2 != null ) {
-        return d1.equals(d2);
-      }
-
-      return d1 == null && d2 == null; // True if both are null
-    }
-  }
-
-  public static class DateValueBoundaryScanner extends SingleValueBoundaryScanner {
-    public DateValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end,expressionDef);
-    }
-
-    @Override
-    public boolean isDistanceGreater(Object v1, Object v2, int amt) {
-      Date l1 = PrimitiveObjectInspectorUtils.getDate(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      Date l2 = PrimitiveObjectInspectorUtils.getDate(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      if (l1 != null && l2 != null) {
-          return (double)(l1.getTime() - l2.getTime())/1000 > (long)amt * 24 * 3600; // Converts amt days to milliseconds
-      }
-      return l1 != l2; // True if only one date is null
-    }
-
-    @Override
-    public boolean isEqual(Object v1, Object v2) {
-      Date l1 = PrimitiveObjectInspectorUtils.getDate(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      Date l2 = PrimitiveObjectInspectorUtils.getDate(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      return (l1 == null && l2 == null) || (l1 != null && l1.equals(l2));
-    }
-  }
-
-  public static class StringValueBoundaryScanner extends SingleValueBoundaryScanner {
-    public StringValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
-      super(start, end,expressionDef);
-    }
-
-    @Override
-    public boolean isDistanceGreater(Object v1, Object v2, int amt) {
-      String s1 = PrimitiveObjectInspectorUtils.getString(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      String s2 = PrimitiveObjectInspectorUtils.getString(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      return s1 != null && s2 != null && s1.compareTo(s2) > 0;
-    }
-
-    @Override
-    public boolean isEqual(Object v1, Object v2) {
-      String s1 = PrimitiveObjectInspectorUtils.getString(v1,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      String s2 = PrimitiveObjectInspectorUtils.getString(v2,
-          (PrimitiveObjectInspector) expressionDef.getOI());
-      return (s1 == null && s2 == null) || (s1 != null && s1.equals(s2));
-    }
-  }
-
-  /*
-   */
-  static class MultiValueBoundaryScanner extends ValueBoundaryScanner {
-    OrderDef orderDef;
-
-    public MultiValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) {
-      super(start, end);
-      this.orderDef = orderDef;
-    }
-
-    /*
-|------+----------------+----------------+----------+-------+-----------------------------------|
-| Use  | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior                          |
-| Case |                |                |          |       |                                   |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-|   1. | PRECEDING      | UNB            | ANY      | ANY   | start = 0                         |
-|   2. | CURRENT ROW    |                | ANY      | ANY   | scan backwards until row R2       |
-|      |                |                |          |       | such R2.sk != R.sk                |
-|      |                |                |          |       | start = R2.idx + 1                |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-     */
-    @Override
-    protected int computeStart(int rowIdx, PTFPartition p) throws HiveException {
-      switch(start.getDirection()) {
-      case PRECEDING:
-        return computeStartPreceding(rowIdx, p);
-      case CURRENT:
-        return computeStartCurrentRow(rowIdx, p);
-      case FOLLOWING:
-        default:
-          throw new HiveException(
-                  "FOLLOWING not allowed for starting RANGE with multiple expressions in ORDER BY");
-      }
-    }
-
-    protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = start.getAmt();
-      if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
-        return 0;
-      }
-      throw new HiveException(
-              "PRECEDING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY");
-    }
-
-    protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
-      Object[] sortKey = computeValues(p.getAt(rowIdx));
-      Object[] rowVal = sortKey;
-      int r = rowIdx;
-
-      while (r >= 0 && isEqual(rowVal, sortKey) ) {
-        r--;
-        if ( r >= 0 ) {
-          rowVal = computeValues(p.getAt(r));
-        }
-      }
-      return r + 1;
-    }
-
-    /*
-|------+----------------+---------------+----------+-------+-----------------------------------|
-| Use  | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior                          |
-| Case |                |               |          |       |                                   |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-|   1. | CURRENT ROW    |               | ANY      | ANY   | scan forward until row R2         |
-|      |                |               |          |       | such that R2.sk != R.sk           |
-|      |                |               |          |       | end = R2.idx                      |
-|   2. | FOLLOWING      | UNB           | ANY      | ANY   | end = partition.size()            |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-     */
-    @Override
-    protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException {
-      switch(end.getDirection()) {
-      case PRECEDING:
-        throw new HiveException(
-                "PRECEDING not allowed for finishing RANGE with multiple expressions in ORDER BY");
-      case CURRENT:
-        return computeEndCurrentRow(rowIdx, p);
-      case FOLLOWING:
-        default:
-          return computeEndFollowing(rowIdx, p);
-      }
-    }
-
-    protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
-      Object[] sortKey = computeValues(p.getAt(rowIdx));
-      Object[] rowVal = sortKey;
-      int r = rowIdx;
-
-      while (r < p.size() && isEqual(sortKey, rowVal) ) {
-        r++;
-        if ( r < p.size() ) {
-          rowVal = computeValues(p.getAt(r));
-        }
-      }
-      return r;
-    }
-
-    protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException {
-      int amt = end.getAmt();
-      if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
-        return p.size();
-      }
-      throw new HiveException(
-              "FOLLOWING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY");
-    }
-
-    public Object[] computeValues(Object row) throws HiveException {
-      Object[] objs = new Object[orderDef.getExpressions().size()];
-      for (int i = 0; i < objs.length; i++) {
-        Object o = orderDef.getExpressions().get(i).getExprEvaluator().evaluate(row);
-        objs[i] = ObjectInspectorUtils.copyToStandardObject(o, orderDef.getExpressions().get(i).getOI());
-      }
-      return objs;
-    }
-
-    public boolean isEqual(Object[] v1, Object[] v2) {
-      assert v1.length == v2.length;
-      for (int i = 0; i < v1.length; i++) {
-        if (v1[i] == null && v2[i] == null) {
-          continue;
-        }
-        if (v1[i] == null || v2[i] == null) {
-          return false;
-        }
-        if (ObjectInspectorUtils.compare(
-                v1[i], orderDef.getExpressions().get(i).getOI(),
-                v2[i], orderDef.getExpressions().get(i).getOI()) != 0) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
   public static class SameList<E> extends AbstractList<E> {
     int sz;
     E val;
@@ -1518,6 +718,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
       return currIdx < iPart.size();
     }
 
+    // Given the data in a partition, evaluate the result for the next row for
+    // streaming and batch mode
     @Override
     public Object next() {
       int i;
@@ -1550,10 +752,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
             }
             output.set(j, out);
           } else {
-            Range rng = getRange(wFn, currIdx, iPart);
-            PTFPartitionIterator<Object> rItr = rng.iterator();
-            PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
-            output.set(j, evaluateWindowFunction(wFn, rItr));
+            Object out = evaluateWindowFunction(wFn, currIdx, iPart);
+            output.set(j, out);
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q b/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
index 97f113c..1877927 100644
--- a/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
+++ b/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
@@ -9,7 +9,7 @@ set mapred.reduce.tasks=4;
 select p_mfgr, p_name, p_size,
 rank() over(distribute by p_mfgr sort by p_name) as r,
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
 from part
 ;
 
@@ -44,7 +44,7 @@ select p_mfgr, p_name,
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
 count(p_size) over(distribute by p_mfgr sort by p_name) as cd, 
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1, 
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1, 
 p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz 
 from part 
 ;
@@ -55,7 +55,7 @@ from (select p_mfgr, p_name,
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
 count(p_size) over(distribute by p_mfgr sort by p_name) as cd, 
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1, 
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1, 
 p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz 
 from part 
 ) sub1;
@@ -64,7 +64,7 @@ from part
 select abc.p_mfgr, abc.p_name, 
 rank() over(distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over(distribute by abc.p_mfgr sort by abc.p_name) as dr, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1, 
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over(distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part 
 partition by p_mfgr 
@@ -82,7 +82,7 @@ from part
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
 from part  
 ; 
 
@@ -90,7 +90,7 @@ from part
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1 
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1 
 from part 
 ;
 
@@ -162,19 +162,19 @@ window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 precedi
 
 -- 18. testUDAFs
 select  p_mfgr,p_name, p_size, 
-sum(p_retailprice) over w1 as s, 
+round(sum(p_retailprice) over w1,2) as s,
 min(p_retailprice) over w1 as mi,
 max(p_retailprice) over w1 as ma,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
 from part
 window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
 
 -- 19. testUDAFsWithGBY
 select  p_mfgr,p_name, p_size, p_retailprice, 
-sum(p_retailprice) over w1 as s, 
+round(sum(p_retailprice) over w1,2) as s,
 min(p_retailprice) as mi ,
 max(p_retailprice) as ma ,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
 from part
 group by p_mfgr,p_name, p_size, p_retailprice
 window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
@@ -222,7 +222,7 @@ window w1 as (distribute by p_mfgr sort by p_brand rows between 2 preceding and
 -- 23. testCreateViewWithWindowingQuery
 create view IF NOT EXISTS mfgr_brand_price_view as 
 select p_mfgr, p_brand, 
-sum(p_retailprice) over w1  as s
+round(sum(p_retailprice) over w1,2) as s
 from part 
 window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and current row);
         
@@ -267,7 +267,7 @@ INSERT OVERWRITE TABLE part_1
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name ) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name ) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
 INSERT OVERWRITE TABLE part_2 
 select  p_mfgr,p_name, p_size,  
 rank() over(distribute by p_mfgr sort by p_name) as r, 
@@ -386,7 +386,7 @@ from part;
 
 -- 38. testPartitioningVariousForms2
 select p_mfgr, p_name, p_size,
-sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row) as s1,
+round(sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row),2) as s1,
 min(p_retailprice) over (distribute by p_mfgr, p_name sort by p_mfgr, p_name rows between unbounded preceding and current row) as s2,
 max(p_retailprice) over (partition by p_mfgr, p_name order by p_name) as s3
 from part;
@@ -398,22 +398,22 @@ from part;
 
 -- 40. testNoBetweenForRows
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding),2) as s1
      from part ;
 
 -- 41. testNoBetweenForRange
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding),2) as s1
      from part ;
 
 -- 42. testUnboundedFollowingForRows
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following),2) as s1
     from part ;
 
 -- 43. testUnboundedFollowingForRange
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following),2) as s1
     from part ;
         
 -- 44. testOverNoPartitionSingleAggregate
@@ -430,8 +430,8 @@ where p_mfgr = 'Manufacturer#6'
 ;
 
 -- 46. window sz is same as partition sz
-select p_retailprice, avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following), 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following) 
+select p_retailprice, round(avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2), 
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2) 
 from part 
 where p_mfgr='Manufacturer#1';
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/ptf.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/ptf.q b/ql/src/test/queries/clientpositive/ptf.q
index b5b271b..5ab6cdc 100644
--- a/ql/src/test/queries/clientpositive/ptf.q
+++ b/ql/src/test/queries/clientpositive/ptf.q
@@ -6,7 +6,7 @@ explain
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part 
   partition by p_mfgr
   order by p_name
@@ -15,7 +15,7 @@ from noop(on part
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part 
   partition by p_mfgr
   order by p_name
@@ -54,7 +54,7 @@ explain
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part 
   partition by p_mfgr
   order by p_name
@@ -63,7 +63,7 @@ from noop(on part
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part 
   partition by p_mfgr
   order by p_name
@@ -162,7 +162,7 @@ explain
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noopwithmap(on part 
   partition by p_mfgr
   order by p_name);
@@ -170,7 +170,7 @@ from noopwithmap(on part
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noopwithmap(on part 
   partition by p_mfgr
   order by p_name);
@@ -180,7 +180,7 @@ explain
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part
 partition by p_mfgr
 order by p_name)
@@ -189,7 +189,7 @@ order by p_name)
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part
 partition by p_mfgr
 order by p_name)
@@ -200,7 +200,7 @@ explain
 select p_mfgr, p_name, p_size, 
 rank() over (partition by p_mfgr order by p_name) as r, 
 dense_rank() over (partition by p_mfgr order by p_name) as dr, 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on noopwithmap(on noop(on part 
 partition by p_mfgr 
 order by p_mfgr DESC, p_name
@@ -209,7 +209,7 @@ order by p_mfgr DESC, p_name
 select p_mfgr, p_name, p_size, 
 rank() over (partition by p_mfgr order by p_name) as r, 
 dense_rank() over (partition by p_mfgr order by p_name) as dr, 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on noopwithmap(on noop(on part 
 partition by p_mfgr 
 order by p_mfgr DESC, p_name
@@ -222,7 +222,7 @@ sub1.cd, sub1.s1
 from (select p_mfgr, p_name, 
 count(p_size) over (partition by p_mfgr order by p_name) as cd, 
 p_retailprice, 
-sum(p_retailprice) over w1  as s1
+round(sum(p_retailprice) over w1,2) as s1
 from noop(on part 
 partition by p_mfgr 
 order by p_name) 
@@ -234,7 +234,7 @@ sub1.cd, sub1.s1
 from (select p_mfgr, p_name, 
 count(p_size) over (partition by p_mfgr order by p_name) as cd, 
 p_retailprice, 
-sum(p_retailprice) over w1  as s1
+round(sum(p_retailprice) over w1,2)  as s1
 from noop(on part 
 partition by p_mfgr 
 order by p_name) 
@@ -247,7 +247,7 @@ select abc.p_mfgr, abc.p_name,
 rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr, 
 count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part 
 partition by p_mfgr 
@@ -258,8 +258,8 @@ order by p_name
 select abc.p_mfgr, abc.p_name, 
 rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr, 
-count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part 
 partition by p_mfgr 
@@ -283,20 +283,20 @@ order by p_name);
 -- 16. testViewAsTableInputToPTF
 create view IF NOT EXISTS mfgr_price_view as 
 select p_mfgr, p_brand, 
-sum(p_retailprice) as s 
+round(sum(p_retailprice),2) as s
 from part 
 group by p_mfgr, p_brand;
 
 explain
 select p_mfgr, p_brand, s, 
-sum(s) over w1  as s1
+round(sum(s) over w1,2)  as s1
 from noop(on mfgr_price_view 
 partition by p_mfgr 
 order by p_mfgr)  
 window w1 as ( partition by p_mfgr order by p_brand rows between 2 preceding and current row);
 
 select p_mfgr, p_brand, s, 
-sum(s) over w1  as s1
+round(sum(s) over w1,2) as s1
 from noop(on mfgr_price_view 
 partition by p_mfgr 
 order by p_mfgr)  
@@ -328,7 +328,7 @@ order by p_name)
 INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size, 
 rank() over (distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over (distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s  
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2)  as s
 INSERT OVERWRITE TABLE part_5 select  p_mfgr,p_name, p_size,  
 round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
 rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r, 
@@ -343,7 +343,7 @@ order by p_name)
 INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size, 
 rank() over (distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over (distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s  
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2)  as s
 INSERT OVERWRITE TABLE part_5 select  p_mfgr,p_name, p_size,  
 round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
 rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r, 

http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/vectorized_ptf.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vectorized_ptf.q b/ql/src/test/queries/clientpositive/vectorized_ptf.q
index 64082e9..db2dbe1 100644
--- a/ql/src/test/queries/clientpositive/vectorized_ptf.q
+++ b/ql/src/test/queries/clientpositive/vectorized_ptf.q
@@ -46,7 +46,7 @@ explain extended
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part_orc 
   partition by p_mfgr
   order by p_name
@@ -97,7 +97,7 @@ explain extended
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part_orc 
   partition by p_mfgr
   order by p_name
@@ -106,7 +106,7 @@ from noop(on part_orc
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part_orc 
   partition by p_mfgr
   order by p_name
@@ -211,7 +211,7 @@ explain extended
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noopwithmap(on part_orc 
   partition by p_mfgr
   order by p_name);
@@ -219,7 +219,7 @@ from noopwithmap(on part_orc
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noopwithmap(on part_orc 
   partition by p_mfgr
   order by p_name);
@@ -230,7 +230,7 @@ explain extended
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part_orc
 partition by p_mfgr
 order by p_name)
@@ -239,7 +239,7 @@ order by p_name)
 select p_mfgr, p_name, p_size,
 rank() over (partition by p_mfgr order by p_name) as r,
 dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on part_orc
 partition by p_mfgr
 order by p_name)
@@ -251,7 +251,7 @@ explain extended
 select p_mfgr, p_name, p_size, 
 rank() over (partition by p_mfgr order by p_name) as r, 
 dense_rank() over (partition by p_mfgr order by p_name) as dr, 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on noopwithmap(on noop(on part_orc 
 partition by p_mfgr 
 order by p_mfgr, p_name
@@ -260,7 +260,7 @@ order by p_mfgr, p_name
 select p_mfgr, p_name, p_size, 
 rank() over (partition by p_mfgr order by p_name) as r, 
 dense_rank() over (partition by p_mfgr order by p_name) as dr, 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
 from noop(on noopwithmap(on noop(on part_orc 
 partition by p_mfgr 
 order by p_mfgr, p_name
@@ -274,7 +274,7 @@ sub1.cd, sub1.s1
 from (select p_mfgr, p_name, 
 count(p_size) over (partition by p_mfgr order by p_name) as cd, 
 p_retailprice, 
-sum(p_retailprice) over w1  as s1
+round(sum(p_retailprice) over w1,2) as s1
 from noop(on part_orc 
 partition by p_mfgr 
 order by p_name) 
@@ -286,7 +286,7 @@ sub1.cd, sub1.s1
 from (select p_mfgr, p_name, 
 count(p_size) over (partition by p_mfgr order by p_name) as cd, 
 p_retailprice, 
-sum(p_retailprice) over w1  as s1
+round(sum(p_retailprice) over w1,2) as s1
 from noop(on part_orc 
 partition by p_mfgr 
 order by p_name) 
@@ -300,7 +300,7 @@ select abc.p_mfgr, abc.p_name,
 rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr, 
 count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part_orc 
 partition by p_mfgr 
@@ -312,7 +312,7 @@ select abc.p_mfgr, abc.p_name,
 rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr, 
 count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part_orc 
 partition by p_mfgr 
@@ -337,20 +337,20 @@ order by p_name);
 -- 16. testViewAsTableInputToPTF
 create view IF NOT EXISTS mfgr_price_view as 
 select p_mfgr, p_brand, 
-sum(p_retailprice) as s 
+round(sum(p_retailprice),2) as s
 from part_orc 
 group by p_mfgr, p_brand;
 
 explain extended
 select p_mfgr, p_brand, s, 
-sum(s) over w1  as s1
+round(sum(s) over w1,2) as s1
 from noop(on mfgr_price_view 
 partition by p_mfgr 
 order by p_mfgr)  
 window w1 as ( partition by p_mfgr order by p_brand rows between 2 preceding and current row);
 
 select p_mfgr, p_brand, s, 
-sum(s) over w1  as s1
+round(sum(s) over w1,2) as s1
 from noop(on mfgr_price_view 
 partition by p_mfgr 
 order by p_mfgr)  
@@ -382,7 +382,7 @@ order by p_name)
 INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size, 
 rank() over (distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over (distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s  
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
 INSERT OVERWRITE TABLE part_5 select  p_mfgr,p_name, p_size,  
 round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
 rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r, 
@@ -397,7 +397,7 @@ order by p_name)
 INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size, 
 rank() over (distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over (distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s  
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
 INSERT OVERWRITE TABLE part_5 select  p_mfgr,p_name, p_size,  
 round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
 rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r, 

http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/windowing.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing.q b/ql/src/test/queries/clientpositive/windowing.q
index e60a6ef..19fa1ad 100644
--- a/ql/src/test/queries/clientpositive/windowing.q
+++ b/ql/src/test/queries/clientpositive/windowing.q
@@ -6,7 +6,7 @@ set mapred.reduce.tasks=4;
 select p_mfgr, p_name, p_size,
 rank() over(distribute by p_mfgr sort by p_name) as r,
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
 from part
 ;
 
@@ -41,7 +41,7 @@ select p_mfgr, p_name,
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
 count(p_size) over(distribute by p_mfgr sort by p_name) as cd, 
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1, 
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
 p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz 
 from part 
 ;
@@ -52,7 +52,7 @@ from (select p_mfgr, p_name,
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
 count(p_size) over(distribute by p_mfgr sort by p_name) as cd, 
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1, 
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
 p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz 
 from part 
 ) sub1;
@@ -61,7 +61,7 @@ from part
 select abc.p_mfgr, abc.p_name, 
 rank() over(distribute by abc.p_mfgr sort by abc.p_name) as r, 
 dense_rank() over(distribute by abc.p_mfgr sort by abc.p_name) as dr, 
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1, 
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
 abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over(distribute by abc.p_mfgr sort by abc.p_name) as deltaSz 
 from noop(on part 
 partition by p_mfgr 
@@ -79,7 +79,7 @@ from part
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2)  as s1
 from part  
 ; 
 
@@ -87,7 +87,7 @@ from part
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1 
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
 from part 
 ;
 
@@ -159,19 +159,19 @@ window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 precedi
 
 -- 18. testUDAFs
 select  p_mfgr,p_name, p_size, 
-sum(p_retailprice) over w1 as s, 
+round(sum(p_retailprice) over w1,2) as s,
 min(p_retailprice) over w1 as mi,
 max(p_retailprice) over w1 as ma,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
 from part
 window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
 
 -- 19. testUDAFsWithGBY
 select  p_mfgr,p_name, p_size, p_retailprice, 
-sum(p_retailprice) over w1 as s, 
+round(sum(p_retailprice) over w1,2) as s,
 min(p_retailprice) as mi ,
 max(p_retailprice) as ma ,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
 from part
 group by p_mfgr,p_name, p_size, p_retailprice
 window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
@@ -219,7 +219,7 @@ window w1 as (distribute by p_mfgr sort by p_brand rows between 2 preceding and
 -- 23. testCreateViewWithWindowingQuery
 create view IF NOT EXISTS mfgr_brand_price_view as 
 select p_mfgr, p_brand, 
-sum(p_retailprice) over w1  as s
+round(sum(p_retailprice) over w1,2) as s
 from part 
 window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and current row);
         
@@ -264,7 +264,7 @@ INSERT OVERWRITE TABLE part_1
 select p_mfgr, p_name, p_size, 
 rank() over(distribute by p_mfgr sort by p_name ) as r, 
 dense_rank() over(distribute by p_mfgr sort by p_name ) as dr, 
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row)  as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
 INSERT OVERWRITE TABLE part_2 
 select  p_mfgr,p_name, p_size,  
 rank() over(distribute by p_mfgr sort by p_name) as r, 
@@ -389,7 +389,7 @@ from part;
 
 -- 38. testPartitioningVariousForms2
 select p_mfgr, p_name, p_size,
-sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row) as s1,
+round(sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row),2) as s1,
 min(p_retailprice) over (distribute by p_mfgr, p_name sort by p_mfgr, p_name rows between unbounded preceding and current row) as s2,
 max(p_retailprice) over (partition by p_mfgr, p_name order by p_name) as s3
 from part;
@@ -401,22 +401,22 @@ from part;
 
 -- 40. testNoBetweenForRows
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding),2) as s1
      from part ;
 
 -- 41. testNoBetweenForRange
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding),2) as s1
      from part ;
 
 -- 42. testUnboundedFollowingForRows
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following),2) as s1
     from part ;
 
 -- 43. testUnboundedFollowingForRange
 select p_mfgr, p_name, p_size,
-    sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following) as s1
+    round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following),2) as s1
     from part ;
         
 -- 44. testOverNoPartitionSingleAggregate
@@ -433,8 +433,8 @@ where p_mfgr = 'Manufacturer#6'
 ;
 
 -- 46. window sz is same as partition sz
-select p_retailprice, avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following), 
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following) 
+select p_retailprice, round(avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2),
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2)
 from part 
 where p_mfgr='Manufacturer#1';