You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2017/01/12 14:25:56 UTC
[12/13] hive git commit: HIVE-15520: Improve the sum performance for
Range based window (Aihua Xu, reviewed by Yongzhi Chen)
http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
index 2fdb492..86783be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/WindowingTableFunction.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.udf.ptf;
import java.util.AbstractList;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -28,25 +27,19 @@ import java.util.Map;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
-import org.apache.hadoop.hive.ql.exec.PTFOperator;
import org.apache.hadoop.hive.ql.exec.PTFPartition;
import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator;
import org.apache.hadoop.hive.ql.exec.PTFRollingPartition;
import org.apache.hadoop.hive.ql.exec.WindowFunctionInfo;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.WindowingSpec.BoundarySpec;
-import org.apache.hadoop.hive.ql.parse.WindowingSpec.Direction;
import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowType;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ptf.BoundaryDef;
-import org.apache.hadoop.hive.ql.plan.ptf.OrderDef;
-import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.PTFExpressionDef;
import org.apache.hadoop.hive.ql.plan.ptf.PartitionedTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.ptf.WindowFrameDef;
@@ -61,7 +54,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,16 +104,16 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
- boolean processWindow = processWindow(wFn);
+ boolean processWindow = processWindow(wFn.getWindowFrame());
pItr.reset();
if ( !processWindow ) {
- Object out = evaluateWindowFunction(wFn, pItr);
+ Object out = evaluateFunctionOnPartition(wFn, iPart);
if ( !wFn.isPivotResult()) {
out = new SameList(iPart.size(), out);
}
oColumns.add((List<?>)out);
} else {
- oColumns.add(executeFnwithWindow(getQueryDef(), wFn, iPart));
+ oColumns.add(executeFnwithWindow(wFn, iPart));
}
}
@@ -147,30 +139,36 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
}
- Object evaluateWindowFunction(WindowFunctionDef wFn,
- PTFPartitionIterator<Object> pItr) throws HiveException {
- GenericUDAFEvaluator fEval = wFn.getWFnEval();
- Object[] args = new Object[wFn.getArgs() == null ? 0 : wFn.getArgs().size()];
- AggregationBuffer aggBuffer = fEval.getNewAggregationBuffer();
- while(pItr.hasNext())
- {
- Object row = pItr.next();
- int i =0;
- if ( wFn.getArgs() != null ) {
- for(PTFExpressionDef arg : wFn.getArgs())
- {
- args[i++] = arg.getExprEvaluator().evaluate(row);
- }
- }
- fEval.aggregate(aggBuffer, args);
+ // Evaluate the result given a partition and the row number to process
+ private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
+ throws HiveException {
+ BasePartitionEvaluator partitionEval = wFn.getWFnEval()
+ .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI());
+ return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
+ }
+
+ // Evaluate the result given a partition
+ private Object evaluateFunctionOnPartition(WindowFunctionDef wFn,
+ PTFPartition partition) throws HiveException {
+ BasePartitionEvaluator partitionEval = wFn.getWFnEval()
+ .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI());
+ return partitionEval.getPartitionAgg();
+ }
+
+ // Evaluate the function result for each row in the partition
+ ArrayList<Object> executeFnwithWindow(
+ WindowFunctionDef wFnDef,
+ PTFPartition iPart)
+ throws HiveException {
+ ArrayList<Object> vals = new ArrayList<Object>();
+ for(int i=0; i < iPart.size(); i++) {
+ Object out = evaluateWindowFunction(wFnDef, i, iPart);
+ vals.add(out);
}
- Object out = fEval.evaluate(aggBuffer);
- out = ObjectInspectorUtils.copyToStandardObject(out, wFn.getOI());
- return out;
+ return vals;
}
- private boolean processWindow(WindowFunctionDef wFn) {
- WindowFrameDef frame = wFn.getWindowFrame();
+ private static boolean processWindow(WindowFrameDef frame) {
if ( frame == null ) {
return false;
}
@@ -391,7 +389,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
streamingState.rollingPart.append(row);
- WindowTableFunctionDef tabDef = (WindowTableFunctionDef) getTableDef();
+ WindowTableFunctionDef tabDef = (WindowTableFunctionDef) tableDef;
for (int i = 0; i < tabDef.getWindowFunctions().size(); i++) {
WindowFunctionDef wFn = tabDef.getWindowFunctions().get(i);
@@ -417,10 +415,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
} else {
int rowToProcess = streamingState.rollingPart.rowToProcess(wFn.getWindowFrame());
if (rowToProcess >= 0) {
- Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- Object out = evaluateWindowFunction(wFn, rItr);
+ Object out = evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart);
streamingState.fnOutputs[i].add(out);
}
}
@@ -495,10 +490,7 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
while (numRowsRemaining > 0) {
int rowToProcess = streamingState.rollingPart.size() - numRowsRemaining;
if (rowToProcess >= 0) {
- Range rng = getRange(wFn, rowToProcess, streamingState.rollingPart);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- Object out = evaluateWindowFunction(wFn, rItr);
+ Object out = evaluateWindowFunction(wFn, rowToProcess, streamingState.rollingPart);
streamingState.fnOutputs[i].add(out);
}
numRowsRemaining--;
@@ -540,10 +532,10 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
int i=0;
for(WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
- boolean processWindow = processWindow(wFn);
+ boolean processWindow = processWindow(wFn.getWindowFrame());
pItr.reset();
if ( !processWindow && !wFn.isPivotResult() ) {
- Object out = evaluateWindowFunction(wFn, pItr);
+ Object out = evaluateFunctionOnPartition(wFn, iPart);
output.add(out);
} else if (wFn.isPivotResult()) {
GenericUDAFEvaluator streamingEval = wFn.getWFnEval().getWindowingEvaluator(wFn.getWindowFrame());
@@ -558,12 +550,11 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
output.add(null);
wFnsWithWindows.add(i);
} else {
- outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn,
- pItr);
+ outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart);
output.add(null);
}
} else {
- outputFromPivotFunctions[i] = (List) evaluateWindowFunction(wFn, pItr);
+ outputFromPivotFunctions[i] = (List) evaluateFunctionOnPartition(wFn, iPart);
output.add(null);
}
} else {
@@ -652,797 +643,6 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
- ArrayList<Object> executeFnwithWindow(PTFDesc ptfDesc,
- WindowFunctionDef wFnDef,
- PTFPartition iPart)
- throws HiveException {
- ArrayList<Object> vals = new ArrayList<Object>();
- for(int i=0; i < iPart.size(); i++) {
- Range rng = getRange(wFnDef, i, iPart);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- Object out = evaluateWindowFunction(wFnDef, rItr);
- vals.add(out);
- }
- return vals;
- }
-
- private Range getRange(WindowFunctionDef wFnDef, int currRow, PTFPartition p) throws HiveException
- {
- WindowFrameDef winFrame = wFnDef.getWindowFrame();
- BoundaryDef startB = winFrame.getStart();
- BoundaryDef endB = winFrame.getEnd();
-
- int start, end;
- if (winFrame.getWindowType() == WindowType.ROWS) {
- start = getRowBoundaryStart(startB, currRow);
- end = getRowBoundaryEnd(endB, currRow, p);
- } else {
- ValueBoundaryScanner vbs = ValueBoundaryScanner.getScanner(winFrame);
- start = vbs.computeStart(currRow, p);
- end = vbs.computeEnd(currRow, p);
- }
- start = start < 0 ? 0 : start;
- end = end > p.size() ? p.size() : end;
- return new Range(start, end, p);
- }
-
- private int getRowBoundaryStart(BoundaryDef b, int currRow) throws HiveException {
- Direction d = b.getDirection();
- int amt = b.getAmt();
- switch(d) {
- case PRECEDING:
- if (amt == BoundarySpec.UNBOUNDED_AMOUNT) {
- return 0;
- }
- else {
- return currRow - amt;
- }
- case CURRENT:
- return currRow;
- case FOLLOWING:
- return currRow + amt;
- }
- throw new HiveException("Unknown Start Boundary Direction: " + d);
- }
-
- private int getRowBoundaryEnd(BoundaryDef b, int currRow, PTFPartition p) throws HiveException {
- Direction d = b.getDirection();
- int amt = b.getAmt();
- switch(d) {
- case PRECEDING:
- if ( amt == 0 ) {
- return currRow + 1;
- }
- return currRow - amt + 1;
- case CURRENT:
- return currRow + 1;
- case FOLLOWING:
- if (amt == BoundarySpec.UNBOUNDED_AMOUNT) {
- return p.size();
- }
- else {
- return currRow + amt + 1;
- }
- }
- throw new HiveException("Unknown End Boundary Direction: " + d);
- }
-
- static class Range
- {
- int start;
- int end;
- PTFPartition p;
-
- public Range(int start, int end, PTFPartition p)
- {
- super();
- this.start = start;
- this.end = end;
- this.p = p;
- }
-
- public PTFPartitionIterator<Object> iterator()
- {
- return p.range(start, end);
- }
- }
-
-
- static abstract class ValueBoundaryScanner {
- BoundaryDef start, end;
-
- public ValueBoundaryScanner(BoundaryDef start, BoundaryDef end) {
- this.start = start;
- this.end = end;
- }
-
- protected abstract int computeStart(int rowIdx, PTFPartition p) throws HiveException;
-
- protected abstract int computeEnd(int rowIdx, PTFPartition p) throws HiveException;
-
- public static ValueBoundaryScanner getScanner(WindowFrameDef winFrameDef)
- throws HiveException {
- OrderDef orderDef = winFrameDef.getOrderDef();
- int numOrders = orderDef.getExpressions().size();
- if (numOrders != 1) {
- return new MultiValueBoundaryScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef);
- } else {
- return SingleValueBoundaryScanner.getScanner(winFrameDef.getStart(), winFrameDef.getEnd(), orderDef);
- }
- }
- }
-
- /*
- * - starting from the given rowIdx scan in the given direction until a row's expr
- * evaluates to an amt that crosses the 'amt' threshold specified in the BoundaryDef.
- */
- static abstract class SingleValueBoundaryScanner extends ValueBoundaryScanner {
- OrderExpressionDef expressionDef;
-
- public SingleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end);
- this.expressionDef = expressionDef;
- }
-
- /*
-| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior |
-| Case | | | | | |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-| 1. | PRECEDING | UNB | ANY | ANY | start = 0 |
-| 2. | PRECEDING | unsigned int | NULL | ASC | start = 0 |
-| 3. | | | | DESC | scan backwards to row R2 |
-| | | | | | such that R2.sk is not null |
-| | | | | | start = R2.idx + 1 |
-| 4. | PRECEDING | unsigned int | not NULL | DESC | scan backwards until row R2 |
-| | | | | | such that R2.sk - R.sk > amt |
-| | | | | | start = R2.idx + 1 |
-| 5. | PRECEDING | unsigned int | not NULL | ASC | scan backward until row R2 |
-| | | | | | such that R.sk - R2.sk > bnd1.amt |
-| | | | | | start = R2.idx + 1 |
-| 6. | CURRENT ROW | | NULL | ANY | scan backwards until row R2 |
-| | | | | | such that R2.sk is not null |
-| | | | | | start = R2.idx + 1 |
-| 7. | CURRENT ROW | | not NULL | ANY | scan backwards until row R2 |
-| | | | | | such R2.sk != R.sk |
-| | | | | | start = R2.idx + 1 |
-| 8. | FOLLOWING | UNB | ANY | ANY | Error |
-| 9. | FOLLOWING | unsigned int | NULL | DESC | start = partition.size |
-| 10. | | | | ASC | scan forward until R2 |
-| | | | | | such that R2.sk is not null |
-| | | | | | start = R2.idx |
-| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 |
-| | | | | | such that R.sk - R2.sk > amt |
-| | | | | | start = R2.idx |
-| 12. | | | | ASC | scan forward until row R2 |
-| | | | | | such that R2.sk - R.sk > amt |
-|------+----------------+----------------+----------+-------+-----------------------------------|
- */
- @Override
- protected int computeStart(int rowIdx, PTFPartition p) throws HiveException {
- switch(start.getDirection()) {
- case PRECEDING:
- return computeStartPreceding(rowIdx, p);
- case CURRENT:
- return computeStartCurrentRow(rowIdx, p);
- case FOLLOWING:
- default:
- return computeStartFollowing(rowIdx, p);
- }
- }
-
- protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException {
- int amt = start.getAmt();
- // Use Case 1.
- if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
- return 0;
- }
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- if ( sortKey == null ) {
- // Use Case 2.
- if ( expressionDef.getOrder() == Order.ASC ) {
- return 0;
- }
- else { // Use Case 3.
- while ( sortKey == null && rowIdx >= 0 ) {
- --rowIdx;
- if ( rowIdx >= 0 ) {
- sortKey = computeValue(p.getAt(rowIdx));
- }
- }
- return rowIdx+1;
- }
- }
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- // Use Case 4.
- if ( expressionDef.getOrder() == Order.DESC ) {
- while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r + 1;
- }
- else { // Use Case 5.
- while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r + 1;
- }
- }
-
- protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- // Use Case 6.
- if ( sortKey == null ) {
- while ( sortKey == null && rowIdx >= 0 ) {
- --rowIdx;
- if ( rowIdx >= 0 ) {
- sortKey = computeValue(p.getAt(rowIdx));
- }
- }
- return rowIdx+1;
- }
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- // Use Case 7.
- while (r >= 0 && isEqual(rowVal, sortKey) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r + 1;
- }
-
- protected int computeStartFollowing(int rowIdx, PTFPartition p) throws HiveException {
- int amt = start.getAmt();
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- if ( sortKey == null ) {
- // Use Case 9.
- if ( expressionDef.getOrder() == Order.DESC) {
- return p.size();
- }
- else { // Use Case 10.
- while (r < p.size() && rowVal == null ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- }
-
- // Use Case 11.
- if ( expressionDef.getOrder() == Order.DESC) {
- while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- else { // Use Case 12.
- while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- }
-
- /*
-| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior |
-| Case | | | | | |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-| 1. | PRECEDING | UNB | ANY | ANY | Error |
-| 2. | PRECEDING | unsigned int | NULL | DESC | end = partition.size() |
-| 3. | | | | ASC | end = 0 |
-| 4. | PRECEDING | unsigned int | not null | DESC | scan backward until row R2 |
-| | | | | | such that R2.sk - R.sk > bnd.amt |
-| | | | | | end = R2.idx + 1 |
-| 5. | PRECEDING | unsigned int | not null | ASC | scan backward until row R2 |
-| | | | | | such that R.sk - R2.sk > bnd.amt |
-| | | | | | end = R2.idx + 1 |
-| 6. | CURRENT ROW | | NULL | ANY | scan forward until row R2 |
-| | | | | | such that R2.sk is not null |
-| | | | | | end = R2.idx |
-| 7. | CURRENT ROW | | not null | ANY | scan forward until row R2 |
-| | | | | | such that R2.sk != R.sk |
-| | | | | | end = R2.idx |
-| 8. | FOLLOWING | UNB | ANY | ANY | end = partition.size() |
-| 9. | FOLLOWING | unsigned int | NULL | DESC | end = partition.size() |
-| 10. | | | | ASC | scan forward until row R2 |
-| | | | | | such that R2.sk is not null |
-| | | | | | end = R2.idx |
-| 11. | FOLLOWING | unsigned int | not NULL | DESC | scan forward until row R2 |
-| | | | | | such R.sk - R2.sk > bnd.amt |
-| | | | | | end = R2.idx |
-| 12. | | | | ASC | scan forward until row R2 |
-| | | | | | such R2.sk - R2.sk > bnd.amt |
-| | | | | | end = R2.idx |
-|------+----------------+---------------+----------+-------+-----------------------------------|
- */
- @Override
- protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException {
- switch(end.getDirection()) {
- case PRECEDING:
- return computeEndPreceding(rowIdx, p);
- case CURRENT:
- return computeEndCurrentRow(rowIdx, p);
- case FOLLOWING:
- default:
- return computeEndFollowing(rowIdx, p);
- }
- }
-
- protected int computeEndPreceding(int rowIdx, PTFPartition p) throws HiveException {
- int amt = end.getAmt();
- // Use Case 1.
- // amt == UNBOUNDED, is caught during translation
-
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- if ( sortKey == null ) {
- // Use Case 2.
- if ( expressionDef.getOrder() == Order.DESC ) {
- return p.size();
- }
- else { // Use Case 3.
- return 0;
- }
- }
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- // Use Case 4.
- if ( expressionDef.getOrder() == Order.DESC ) {
- while (r >= 0 && !isDistanceGreater(rowVal, sortKey, amt) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r + 1;
- }
- else { // Use Case 5.
- while (r >= 0 && !isDistanceGreater(sortKey, rowVal, amt) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r + 1;
- }
- }
-
- protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- // Use Case 6.
- if ( sortKey == null ) {
- while ( sortKey == null && rowIdx < p.size() ) {
- ++rowIdx;
- if ( rowIdx < p.size() ) {
- sortKey = computeValue(p.getAt(rowIdx));
- }
- }
- return rowIdx;
- }
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- // Use Case 7.
- while (r < p.size() && isEqual(sortKey, rowVal) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
-
- protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException {
- int amt = end.getAmt();
-
- // Use Case 8.
- if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
- return p.size();
- }
- Object sortKey = computeValue(p.getAt(rowIdx));
-
- Object rowVal = sortKey;
- int r = rowIdx;
-
- if ( sortKey == null ) {
- // Use Case 9.
- if ( expressionDef.getOrder() == Order.DESC) {
- return p.size();
- }
- else { // Use Case 10.
- while (r < p.size() && rowVal == null ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- }
-
- // Use Case 11.
- if ( expressionDef.getOrder() == Order.DESC) {
- while (r < p.size() && !isDistanceGreater(sortKey, rowVal, amt) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- else { // Use Case 12.
- while (r < p.size() && !isDistanceGreater(rowVal, sortKey, amt) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValue(p.getAt(r));
- }
- }
- return r;
- }
- }
-
- public Object computeValue(Object row) throws HiveException {
- Object o = expressionDef.getExprEvaluator().evaluate(row);
- return ObjectInspectorUtils.copyToStandardObject(o, expressionDef.getOI());
- }
-
- /**
- * Checks if the distance of v2 to v1 is greater than the given amt.
- * @return True if the value of v1 - v2 is greater than amt or either value is null.
- */
- public abstract boolean isDistanceGreater(Object v1, Object v2, int amt);
-
- /**
- * Checks if the values of v1 or v2 are the same.
- * @return True if both values are the same or both are nulls.
- */
- public abstract boolean isEqual(Object v1, Object v2);
-
-
- @SuppressWarnings("incomplete-switch")
- public static SingleValueBoundaryScanner getScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef)
- throws HiveException {
- if (orderDef.getExpressions().size() != 1) {
- throw new HiveException("Internal error: initializing SingleValueBoundaryScanner with"
- + " multiple expression for sorting");
- }
- OrderExpressionDef exprDef = orderDef.getExpressions().get(0);
- PrimitiveObjectInspector pOI = (PrimitiveObjectInspector) exprDef.getOI();
- switch(pOI.getPrimitiveCategory()) {
- case BYTE:
- case INT:
- case LONG:
- case SHORT:
- case TIMESTAMP:
- return new LongValueBoundaryScanner(start, end, exprDef);
- case DOUBLE:
- case FLOAT:
- return new DoubleValueBoundaryScanner(start, end, exprDef);
- case DECIMAL:
- return new HiveDecimalValueBoundaryScanner(start, end, exprDef);
- case DATE:
- return new DateValueBoundaryScanner(start, end, exprDef);
- case STRING:
- return new StringValueBoundaryScanner(start, end, exprDef);
- }
- throw new HiveException(
- String.format("Internal Error: attempt to setup a Window for datatype %s",
- pOI.getPrimitiveCategory()));
- }
- }
-
- public static class LongValueBoundaryScanner extends SingleValueBoundaryScanner {
- public LongValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end,expressionDef);
- }
-
- @Override
- public boolean isDistanceGreater(Object v1, Object v2, int amt) {
- if (v1 != null && v2 != null) {
- long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return (l1 -l2) > amt;
- }
-
- return v1 != null || v2 != null; // True if only one value is null
- }
-
- @Override
- public boolean isEqual(Object v1, Object v2) {
- if (v1 != null && v2 != null) {
- long l1 = PrimitiveObjectInspectorUtils.getLong(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- long l2 = PrimitiveObjectInspectorUtils.getLong(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return l1 == l2;
- }
-
- return v1 == null && v2 == null; // True if both are null
- }
- }
-
- public static class DoubleValueBoundaryScanner extends SingleValueBoundaryScanner {
- public DoubleValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end,expressionDef);
- }
-
- @Override
- public boolean isDistanceGreater(Object v1, Object v2, int amt) {
- if (v1 != null && v2 != null) {
- double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return (d1 -d2) > amt;
- }
-
- return v1 != null || v2 != null; // True if only one value is null
- }
-
- @Override
- public boolean isEqual(Object v1, Object v2) {
- if (v1 != null && v2 != null) {
- double d1 = PrimitiveObjectInspectorUtils.getDouble(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- double d2 = PrimitiveObjectInspectorUtils.getDouble(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return d1 == d2;
- }
-
- return v1 == null && v2 == null; // True if both are null
- }
- }
-
- public static class HiveDecimalValueBoundaryScanner extends SingleValueBoundaryScanner {
- public HiveDecimalValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end,expressionDef);
- }
-
- @Override
- public boolean isDistanceGreater(Object v1, Object v2, int amt) {
- HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- if ( d1 != null && d2 != null ) {
- return d1.subtract(d2).intValue() > amt; // TODO: lossy conversion!
- }
-
- return d1 != null || d2 != null; // True if only one value is null
- }
-
- @Override
- public boolean isEqual(Object v1, Object v2) {
- HiveDecimal d1 = PrimitiveObjectInspectorUtils.getHiveDecimal(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- HiveDecimal d2 = PrimitiveObjectInspectorUtils.getHiveDecimal(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- if ( d1 != null && d2 != null ) {
- return d1.equals(d2);
- }
-
- return d1 == null && d2 == null; // True if both are null
- }
- }
-
- public static class DateValueBoundaryScanner extends SingleValueBoundaryScanner {
- public DateValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end,expressionDef);
- }
-
- @Override
- public boolean isDistanceGreater(Object v1, Object v2, int amt) {
- Date l1 = PrimitiveObjectInspectorUtils.getDate(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- Date l2 = PrimitiveObjectInspectorUtils.getDate(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- if (l1 != null && l2 != null) {
- return (double)(l1.getTime() - l2.getTime())/1000 > (long)amt * 24 * 3600; // Converts amt days to milliseconds
- }
- return l1 != l2; // True if only one date is null
- }
-
- @Override
- public boolean isEqual(Object v1, Object v2) {
- Date l1 = PrimitiveObjectInspectorUtils.getDate(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- Date l2 = PrimitiveObjectInspectorUtils.getDate(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return (l1 == null && l2 == null) || (l1 != null && l1.equals(l2));
- }
- }
-
- public static class StringValueBoundaryScanner extends SingleValueBoundaryScanner {
- public StringValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderExpressionDef expressionDef) {
- super(start, end,expressionDef);
- }
-
- @Override
- public boolean isDistanceGreater(Object v1, Object v2, int amt) {
- String s1 = PrimitiveObjectInspectorUtils.getString(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- String s2 = PrimitiveObjectInspectorUtils.getString(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return s1 != null && s2 != null && s1.compareTo(s2) > 0;
- }
-
- @Override
- public boolean isEqual(Object v1, Object v2) {
- String s1 = PrimitiveObjectInspectorUtils.getString(v1,
- (PrimitiveObjectInspector) expressionDef.getOI());
- String s2 = PrimitiveObjectInspectorUtils.getString(v2,
- (PrimitiveObjectInspector) expressionDef.getOI());
- return (s1 == null && s2 == null) || (s1 != null && s1.equals(s2));
- }
- }
-
- /*
- */
- static class MultiValueBoundaryScanner extends ValueBoundaryScanner {
- OrderDef orderDef;
-
- public MultiValueBoundaryScanner(BoundaryDef start, BoundaryDef end, OrderDef orderDef) {
- super(start, end);
- this.orderDef = orderDef;
- }
-
- /*
-|------+----------------+----------------+----------+-------+-----------------------------------|
-| Use | Boundary1.type | Boundary1. amt | Sort Key | Order | Behavior |
-| Case | | | | | |
-|------+----------------+----------------+----------+-------+-----------------------------------|
-| 1. | PRECEDING | UNB | ANY | ANY | start = 0 |
-| 2. | CURRENT ROW | | ANY | ANY | scan backwards until row R2 |
-| | | | | | such R2.sk != R.sk |
-| | | | | | start = R2.idx + 1 |
-|------+----------------+----------------+----------+-------+-----------------------------------|
- */
- @Override
- protected int computeStart(int rowIdx, PTFPartition p) throws HiveException {
- switch(start.getDirection()) {
- case PRECEDING:
- return computeStartPreceding(rowIdx, p);
- case CURRENT:
- return computeStartCurrentRow(rowIdx, p);
- case FOLLOWING:
- default:
- throw new HiveException(
- "FOLLOWING not allowed for starting RANGE with multiple expressions in ORDER BY");
- }
- }
-
- protected int computeStartPreceding(int rowIdx, PTFPartition p) throws HiveException {
- int amt = start.getAmt();
- if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
- return 0;
- }
- throw new HiveException(
- "PRECEDING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY");
- }
-
- protected int computeStartCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
- Object[] sortKey = computeValues(p.getAt(rowIdx));
- Object[] rowVal = sortKey;
- int r = rowIdx;
-
- while (r >= 0 && isEqual(rowVal, sortKey) ) {
- r--;
- if ( r >= 0 ) {
- rowVal = computeValues(p.getAt(r));
- }
- }
- return r + 1;
- }
-
- /*
-|------+----------------+---------------+----------+-------+-----------------------------------|
-| Use | Boundary2.type | Boundary2.amt | Sort Key | Order | Behavior |
-| Case | | | | | |
-|------+----------------+---------------+----------+-------+-----------------------------------|
-| 1. | CURRENT ROW | | ANY | ANY | scan forward until row R2 |
-| | | | | | such that R2.sk != R.sk |
-| | | | | | end = R2.idx |
-| 2. | FOLLOWING | UNB | ANY | ANY | end = partition.size() |
-|------+----------------+---------------+----------+-------+-----------------------------------|
- */
- @Override
- protected int computeEnd(int rowIdx, PTFPartition p) throws HiveException {
- switch(end.getDirection()) {
- case PRECEDING:
- throw new HiveException(
- "PRECEDING not allowed for finishing RANGE with multiple expressions in ORDER BY");
- case CURRENT:
- return computeEndCurrentRow(rowIdx, p);
- case FOLLOWING:
- default:
- return computeEndFollowing(rowIdx, p);
- }
- }
-
- protected int computeEndCurrentRow(int rowIdx, PTFPartition p) throws HiveException {
- Object[] sortKey = computeValues(p.getAt(rowIdx));
- Object[] rowVal = sortKey;
- int r = rowIdx;
-
- while (r < p.size() && isEqual(sortKey, rowVal) ) {
- r++;
- if ( r < p.size() ) {
- rowVal = computeValues(p.getAt(r));
- }
- }
- return r;
- }
-
- protected int computeEndFollowing(int rowIdx, PTFPartition p) throws HiveException {
- int amt = end.getAmt();
- if ( amt == BoundarySpec.UNBOUNDED_AMOUNT ) {
- return p.size();
- }
- throw new HiveException(
- "FOLLOWING needs UNBOUNDED for RANGE with multiple expressions in ORDER BY");
- }
-
- public Object[] computeValues(Object row) throws HiveException {
- Object[] objs = new Object[orderDef.getExpressions().size()];
- for (int i = 0; i < objs.length; i++) {
- Object o = orderDef.getExpressions().get(i).getExprEvaluator().evaluate(row);
- objs[i] = ObjectInspectorUtils.copyToStandardObject(o, orderDef.getExpressions().get(i).getOI());
- }
- return objs;
- }
-
- public boolean isEqual(Object[] v1, Object[] v2) {
- assert v1.length == v2.length;
- for (int i = 0; i < v1.length; i++) {
- if (v1[i] == null && v2[i] == null) {
- continue;
- }
- if (v1[i] == null || v2[i] == null) {
- return false;
- }
- if (ObjectInspectorUtils.compare(
- v1[i], orderDef.getExpressions().get(i).getOI(),
- v2[i], orderDef.getExpressions().get(i).getOI()) != 0) {
- return false;
- }
- }
- return true;
- }
- }
-
public static class SameList<E> extends AbstractList<E> {
int sz;
E val;
@@ -1518,6 +718,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
return currIdx < iPart.size();
}
+ // Given the data in a partition, evaluate the result for the next row for
+ // streaming and batch mode
@Override
public Object next() {
int i;
@@ -1550,10 +752,8 @@ public class WindowingTableFunction extends TableFunctionEvaluator {
}
output.set(j, out);
} else {
- Range rng = getRange(wFn, currIdx, iPart);
- PTFPartitionIterator<Object> rItr = rng.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc, rItr);
- output.set(j, evaluateWindowFunction(wFn, rItr));
+ Object out = evaluateWindowFunction(wFn, currIdx, iPart);
+ output.set(j, out);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q b/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
index 97f113c..1877927 100644
--- a/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
+++ b/ql/src/test/queries/clientpositive/cbo_rp_windowing_2.q
@@ -9,7 +9,7 @@ set mapred.reduce.tasks=4;
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -44,7 +44,7 @@ select p_mfgr, p_name,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
count(p_size) over(distribute by p_mfgr sort by p_name) as cd,
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1,
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz
from part
;
@@ -55,7 +55,7 @@ from (select p_mfgr, p_name,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
count(p_size) over(distribute by p_mfgr sort by p_name) as cd,
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1,
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz
from part
) sub1;
@@ -64,7 +64,7 @@ from part
select abc.p_mfgr, abc.p_name,
rank() over(distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over(distribute by abc.p_mfgr sort by abc.p_name) as dr,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over(distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part
partition by p_mfgr
@@ -82,7 +82,7 @@ from part
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -90,7 +90,7 @@ from part
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -162,19 +162,19 @@ window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 precedi
-- 18. testUDAFs
select p_mfgr,p_name, p_size,
-sum(p_retailprice) over w1 as s,
+round(sum(p_retailprice) over w1,2) as s,
min(p_retailprice) over w1 as mi,
max(p_retailprice) over w1 as ma,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
from part
window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
-- 19. testUDAFsWithGBY
select p_mfgr,p_name, p_size, p_retailprice,
-sum(p_retailprice) over w1 as s,
+round(sum(p_retailprice) over w1,2) as s,
min(p_retailprice) as mi ,
max(p_retailprice) as ma ,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
from part
group by p_mfgr,p_name, p_size, p_retailprice
window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
@@ -222,7 +222,7 @@ window w1 as (distribute by p_mfgr sort by p_brand rows between 2 preceding and
-- 23. testCreateViewWithWindowingQuery
create view IF NOT EXISTS mfgr_brand_price_view as
select p_mfgr, p_brand,
-sum(p_retailprice) over w1 as s
+round(sum(p_retailprice) over w1,2) as s
from part
window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and current row);
@@ -267,7 +267,7 @@ INSERT OVERWRITE TABLE part_1
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name ) as r,
dense_rank() over(distribute by p_mfgr sort by p_name ) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_2
select p_mfgr,p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
@@ -386,7 +386,7 @@ from part;
-- 38. testPartitioningVariousForms2
select p_mfgr, p_name, p_size,
-sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row) as s1,
+round(sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row),2) as s1,
min(p_retailprice) over (distribute by p_mfgr, p_name sort by p_mfgr, p_name rows between unbounded preceding and current row) as s2,
max(p_retailprice) over (partition by p_mfgr, p_name order by p_name) as s3
from part;
@@ -398,22 +398,22 @@ from part;
-- 40. testNoBetweenForRows
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding),2) as s1
from part ;
-- 41. testNoBetweenForRange
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding),2) as s1
from part ;
-- 42. testUnboundedFollowingForRows
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following),2) as s1
from part ;
-- 43. testUnboundedFollowingForRange
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following),2) as s1
from part ;
-- 44. testOverNoPartitionSingleAggregate
@@ -430,8 +430,8 @@ where p_mfgr = 'Manufacturer#6'
;
-- 46. window sz is same as partition sz
-select p_retailprice, avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following)
+select p_retailprice, round(avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2),
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2)
from part
where p_mfgr='Manufacturer#1';
http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/ptf.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/ptf.q b/ql/src/test/queries/clientpositive/ptf.q
index b5b271b..5ab6cdc 100644
--- a/ql/src/test/queries/clientpositive/ptf.q
+++ b/ql/src/test/queries/clientpositive/ptf.q
@@ -6,7 +6,7 @@ explain
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name
@@ -15,7 +15,7 @@ from noop(on part
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name
@@ -54,7 +54,7 @@ explain
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name
@@ -63,7 +63,7 @@ from noop(on part
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name
@@ -162,7 +162,7 @@ explain
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noopwithmap(on part
partition by p_mfgr
order by p_name);
@@ -170,7 +170,7 @@ from noopwithmap(on part
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noopwithmap(on part
partition by p_mfgr
order by p_name);
@@ -180,7 +180,7 @@ explain
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name)
@@ -189,7 +189,7 @@ order by p_name)
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part
partition by p_mfgr
order by p_name)
@@ -200,7 +200,7 @@ explain
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on noopwithmap(on noop(on part
partition by p_mfgr
order by p_mfgr DESC, p_name
@@ -209,7 +209,7 @@ order by p_mfgr DESC, p_name
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on noopwithmap(on noop(on part
partition by p_mfgr
order by p_mfgr DESC, p_name
@@ -222,7 +222,7 @@ sub1.cd, sub1.s1
from (select p_mfgr, p_name,
count(p_size) over (partition by p_mfgr order by p_name) as cd,
p_retailprice,
-sum(p_retailprice) over w1 as s1
+round(sum(p_retailprice) over w1,2) as s1
from noop(on part
partition by p_mfgr
order by p_name)
@@ -234,7 +234,7 @@ sub1.cd, sub1.s1
from (select p_mfgr, p_name,
count(p_size) over (partition by p_mfgr order by p_name) as cd,
p_retailprice,
-sum(p_retailprice) over w1 as s1
+round(sum(p_retailprice) over w1,2) as s1
from noop(on part
partition by p_mfgr
order by p_name)
@@ -247,7 +247,7 @@ select abc.p_mfgr, abc.p_name,
rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr,
count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part
partition by p_mfgr
@@ -258,8 +258,8 @@ order by p_name
select abc.p_mfgr, abc.p_name,
rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr,
-count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part
partition by p_mfgr
@@ -283,20 +283,20 @@ order by p_name);
-- 16. testViewAsTableInputToPTF
create view IF NOT EXISTS mfgr_price_view as
select p_mfgr, p_brand,
-sum(p_retailprice) as s
+round(sum(p_retailprice),2) as s
from part
group by p_mfgr, p_brand;
explain
select p_mfgr, p_brand, s,
-sum(s) over w1 as s1
+round(sum(s) over w1,2) as s1
from noop(on mfgr_price_view
partition by p_mfgr
order by p_mfgr)
window w1 as ( partition by p_mfgr order by p_brand rows between 2 preceding and current row);
select p_mfgr, p_brand, s,
-sum(s) over w1 as s1
+round(sum(s) over w1,2) as s1
from noop(on mfgr_price_view
partition by p_mfgr
order by p_mfgr)
@@ -328,7 +328,7 @@ order by p_name)
INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size,
rank() over (distribute by p_mfgr sort by p_name) as r,
dense_rank() over (distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_5 select p_mfgr,p_name, p_size,
round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r,
@@ -343,7 +343,7 @@ order by p_name)
INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size,
rank() over (distribute by p_mfgr sort by p_name) as r,
dense_rank() over (distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_5 select p_mfgr,p_name, p_size,
round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r,
http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/vectorized_ptf.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vectorized_ptf.q b/ql/src/test/queries/clientpositive/vectorized_ptf.q
index 64082e9..db2dbe1 100644
--- a/ql/src/test/queries/clientpositive/vectorized_ptf.q
+++ b/ql/src/test/queries/clientpositive/vectorized_ptf.q
@@ -46,7 +46,7 @@ explain extended
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name
@@ -97,7 +97,7 @@ explain extended
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name
@@ -106,7 +106,7 @@ from noop(on part_orc
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name
@@ -211,7 +211,7 @@ explain extended
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noopwithmap(on part_orc
partition by p_mfgr
order by p_name);
@@ -219,7 +219,7 @@ from noopwithmap(on part_orc
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noopwithmap(on part_orc
partition by p_mfgr
order by p_name);
@@ -230,7 +230,7 @@ explain extended
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name)
@@ -239,7 +239,7 @@ order by p_name)
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name)
@@ -251,7 +251,7 @@ explain extended
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on noopwithmap(on noop(on part_orc
partition by p_mfgr
order by p_mfgr, p_name
@@ -260,7 +260,7 @@ order by p_mfgr, p_name
select p_mfgr, p_name, p_size,
rank() over (partition by p_mfgr order by p_name) as r,
dense_rank() over (partition by p_mfgr order by p_name) as dr,
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between unbounded preceding and current row),2) as s1
from noop(on noopwithmap(on noop(on part_orc
partition by p_mfgr
order by p_mfgr, p_name
@@ -274,7 +274,7 @@ sub1.cd, sub1.s1
from (select p_mfgr, p_name,
count(p_size) over (partition by p_mfgr order by p_name) as cd,
p_retailprice,
-sum(p_retailprice) over w1 as s1
+round(sum(p_retailprice) over w1,2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name)
@@ -286,7 +286,7 @@ sub1.cd, sub1.s1
from (select p_mfgr, p_name,
count(p_size) over (partition by p_mfgr order by p_name) as cd,
p_retailprice,
-sum(p_retailprice) over w1 as s1
+round(sum(p_retailprice) over w1,2) as s1
from noop(on part_orc
partition by p_mfgr
order by p_name)
@@ -300,7 +300,7 @@ select abc.p_mfgr, abc.p_name,
rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr,
count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part_orc
partition by p_mfgr
@@ -312,7 +312,7 @@ select abc.p_mfgr, abc.p_name,
rank() over (distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over (distribute by abc.p_mfgr sort by abc.p_name) as dr,
count(abc.p_name) over (distribute by abc.p_mfgr sort by abc.p_name) as cd,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over (distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part_orc
partition by p_mfgr
@@ -337,20 +337,20 @@ order by p_name);
-- 16. testViewAsTableInputToPTF
create view IF NOT EXISTS mfgr_price_view as
select p_mfgr, p_brand,
-sum(p_retailprice) as s
+round(sum(p_retailprice),2) as s
from part_orc
group by p_mfgr, p_brand;
explain extended
select p_mfgr, p_brand, s,
-sum(s) over w1 as s1
+round(sum(s) over w1,2) as s1
from noop(on mfgr_price_view
partition by p_mfgr
order by p_mfgr)
window w1 as ( partition by p_mfgr order by p_brand rows between 2 preceding and current row);
select p_mfgr, p_brand, s,
-sum(s) over w1 as s1
+round(sum(s) over w1,2) as s1
from noop(on mfgr_price_view
partition by p_mfgr
order by p_mfgr)
@@ -382,7 +382,7 @@ order by p_name)
INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size,
rank() over (distribute by p_mfgr sort by p_name) as r,
dense_rank() over (distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_5 select p_mfgr,p_name, p_size,
round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r,
@@ -397,7 +397,7 @@ order by p_name)
INSERT OVERWRITE TABLE part_4 select p_mfgr, p_name, p_size,
rank() over (distribute by p_mfgr sort by p_name) as r,
dense_rank() over (distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_5 select p_mfgr,p_name, p_size,
round(sum(p_size) over (distribute by p_mfgr sort by p_size range between 5 preceding and current row),1) as s2,
rank() over (distribute by p_mfgr sort by p_mfgr, p_name) as r,
http://git-wip-us.apache.org/repos/asf/hive/blob/a28b28f3/ql/src/test/queries/clientpositive/windowing.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing.q b/ql/src/test/queries/clientpositive/windowing.q
index e60a6ef..19fa1ad 100644
--- a/ql/src/test/queries/clientpositive/windowing.q
+++ b/ql/src/test/queries/clientpositive/windowing.q
@@ -6,7 +6,7 @@ set mapred.reduce.tasks=4;
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -41,7 +41,7 @@ select p_mfgr, p_name,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
count(p_size) over(distribute by p_mfgr sort by p_name) as cd,
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1,
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz
from part
;
@@ -52,7 +52,7 @@ from (select p_mfgr, p_name,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
count(p_size) over(distribute by p_mfgr sort by p_name) as cd,
-p_retailprice, sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1,
+p_retailprice, round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1,
p_size, p_size - lag(p_size,1,p_size) over(distribute by p_mfgr sort by p_name) as deltaSz
from part
) sub1;
@@ -61,7 +61,7 @@ from part
select abc.p_mfgr, abc.p_name,
rank() over(distribute by abc.p_mfgr sort by abc.p_name) as r,
dense_rank() over(distribute by abc.p_mfgr sort by abc.p_name) as dr,
-abc.p_retailprice, sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row) as s1,
+abc.p_retailprice, round(sum(abc.p_retailprice) over (distribute by abc.p_mfgr sort by abc.p_name rows between unbounded preceding and current row),2) as s1,
abc.p_size, abc.p_size - lag(abc.p_size,1,abc.p_size) over(distribute by abc.p_mfgr sort by abc.p_name) as deltaSz
from noop(on part
partition by p_mfgr
@@ -79,7 +79,7 @@ from part
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -87,7 +87,7 @@ from part
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s1
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s1
from part
;
@@ -159,19 +159,19 @@ window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 precedi
-- 18. testUDAFs
select p_mfgr,p_name, p_size,
-sum(p_retailprice) over w1 as s,
+round(sum(p_retailprice) over w1,2) as s,
min(p_retailprice) over w1 as mi,
max(p_retailprice) over w1 as ma,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
from part
window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
-- 19. testUDAFsWithGBY
select p_mfgr,p_name, p_size, p_retailprice,
-sum(p_retailprice) over w1 as s,
+round(sum(p_retailprice) over w1,2) as s,
min(p_retailprice) as mi ,
max(p_retailprice) as ma ,
-avg(p_retailprice) over w1 as ag
+round(avg(p_retailprice) over w1,2) as ag
from part
group by p_mfgr,p_name, p_size, p_retailprice
window w1 as (distribute by p_mfgr sort by p_mfgr, p_name rows between 2 preceding and 2 following);
@@ -219,7 +219,7 @@ window w1 as (distribute by p_mfgr sort by p_brand rows between 2 preceding and
-- 23. testCreateViewWithWindowingQuery
create view IF NOT EXISTS mfgr_brand_price_view as
select p_mfgr, p_brand,
-sum(p_retailprice) over w1 as s
+round(sum(p_retailprice) over w1,2) as s
from part
window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and current row);
@@ -264,7 +264,7 @@ INSERT OVERWRITE TABLE part_1
select p_mfgr, p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name ) as r,
dense_rank() over(distribute by p_mfgr sort by p_name ) as dr,
-sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row) as s
+round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between unbounded preceding and current row),2) as s
INSERT OVERWRITE TABLE part_2
select p_mfgr,p_name, p_size,
rank() over(distribute by p_mfgr sort by p_name) as r,
@@ -389,7 +389,7 @@ from part;
-- 38. testPartitioningVariousForms2
select p_mfgr, p_name, p_size,
-sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row) as s1,
+round(sum(p_retailprice) over (partition by p_mfgr, p_name order by p_mfgr, p_name rows between unbounded preceding and current row),2) as s1,
min(p_retailprice) over (distribute by p_mfgr, p_name sort by p_mfgr, p_name rows between unbounded preceding and current row) as s2,
max(p_retailprice) over (partition by p_mfgr, p_name order by p_name) as s3
from part;
@@ -401,22 +401,22 @@ from part;
-- 40. testNoBetweenForRows
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows unbounded preceding),2) as s1
from part ;
-- 41. testNoBetweenForRange
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range unbounded preceding),2) as s1
from part ;
-- 42. testUnboundedFollowingForRows
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_name rows between current row and unbounded following),2) as s1
from part ;
-- 43. testUnboundedFollowingForRange
select p_mfgr, p_name, p_size,
- sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following) as s1
+ round(sum(p_retailprice) over (distribute by p_mfgr sort by p_size range between current row and unbounded following),2) as s1
from part ;
-- 44. testOverNoPartitionSingleAggregate
@@ -433,8 +433,8 @@ where p_mfgr = 'Manufacturer#6'
;
-- 46. window sz is same as partition sz
-select p_retailprice, avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),
-sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following)
+select p_retailprice, round(avg(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2),
+round(sum(p_retailprice) over (partition by p_mfgr order by p_name rows between current row and 6 following),2)
from part
where p_mfgr='Manufacturer#1';