You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/05/22 22:58:09 UTC
svn commit: r1485419 [4/4] - in /hive/branches/vectorization:
metastore/src/java/org/apache/hadoop/hive/metastore/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
ql/src/java/org/apache/hadoop/hive/ql/exec...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,9 +51,18 @@ public class <ClassName> extends VectorA
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
<ValueType> sum;
boolean isNull;
+
+ public void sumValue(<ValueType> value) {
+ if (isNull) {
+ sum = value;
+ isNull = false;
+ } else {
+ sum += value;
+ }
+ }
}
VectorExpression inputExpression;
@@ -63,17 +73,207 @@ public class <ClassName> extends VectorA
this.inputExpression = inputExpression;
result = new <OutputType>();
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ vector, batchSize, inputVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- <InputColumnVectorType> inputVector = (<InputColumnVectorType>)unit.
+ <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -94,17 +294,17 @@ public class <ClassName> extends VectorA
return;
}
- if (!unit.selectedInUse && inputVector.noNulls) {
+ if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
.VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -50,13 +51,13 @@ public class <ClassName> extends VectorA
/**
/* class for storing the current aggregate value.
*/
- static private final class Aggregation implements AggregationBuffer {
+ private static final class Aggregation implements AggregationBuffer {
double sum;
long count;
double variance;
boolean isNull;
- public void init () {
+ public void init() {
isNull = false;
sum = 0;
count = 0;
@@ -86,7 +87,7 @@ public class <ClassName> extends VectorA
initPartialResultInspector();
}
- private void initPartialResultInspector () {
+ private void initPartialResultInspector() {
ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
@@ -99,17 +100,200 @@ public class <ClassName> extends VectorA
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ <ValueType>[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ iterateRepeatingNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector[0], batchSize);
+ }
+ }
+ else if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, vector, batchSize,
+ inputVector.isNull, batch.selected);
+ }
+
+ }
+ private void iterateRepeatingNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ <ValueType> value,
+ int batchSize) {
+
+ for (int i=0; i<batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ <ValueType>[] vector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ int i = selected[j];
+ if (!isNull[i]) {
+ <ValueType> value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ <ValueType>[] vector,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ <ValueType> value = vector[selected[i]];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ <ValueType>[] vector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i<batchSize;++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ <ValueType> value = vector[i];
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ <ValueType>[] vector,
+ int batchSize) {
+
+ for (int i=0; i<batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ if (myagg.isNull) {
+ myagg.init ();
+ }
+ <ValueType> value = vector[i];
+ myagg.sum += value;
+ myagg.count += 1;
+ if(myagg.count > 1) {
+ double t = myagg.count*value - myagg.sum;
+ myagg.variance += (t*t) / ((double)myagg.count*(myagg.count-1));
+ }
+ }
+ }
+
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- <InputColumnVectorType> inputVector = (<InputColumnVectorType>)unit.
+ <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -124,17 +308,17 @@ public class <ClassName> extends VectorA
iterateRepeatingNoNulls(myagg, vector[0], batchSize);
}
}
- else if (!unit.selectedInUse && inputVector.noNulls) {
+ else if (!batch.selectedInUse && inputVector.noNulls) {
iterateNoSelectionNoNulls(myagg, vector, batchSize);
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
}
else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
}
else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
}
}
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Wed May 22 20:58:08 2013
@@ -28,8 +28,10 @@ import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hive.ql.exec.vector.util.FakeCaptureOutputOperator;
import org.apache.hadoop.hive.ql.exec.vector.util.FakeVectorRowBatchFromConcat;
@@ -93,8 +95,144 @@ public class TestVectorGroupByOperator {
return desc;
}
+ private static GroupByDesc buildKeyGroupByDesc(
+ VectorizationContext ctx,
+ String aggregate,
+ String column,
+ String key) {
+
+ GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column);
+
+ ExprNodeDesc keyExp = buildColumnDesc(ctx, key);
+ ArrayList<ExprNodeDesc> keys = new ArrayList<ExprNodeDesc>();
+ keys.add(keyExp);
+ desc.setKeys(keys);
+
+ return desc;
+ }
+
+ @Test
+ public void testMinLongKeyGroupByCompactBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{01L,1L,2L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 5L, 2L, 7L));
+ }
+
+ @Test
+ public void testMinLongKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{01L,1L,2L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 5L, 2L, 7L));
+ }
+
+ @Test
+ public void testMinLongKeyGroupByCrossBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{01L,2L,1L,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(1L, 7L, 2L, 5L));
+ }
+
+ @Test
+ public void testMinLongNullKeyGroupByCrossBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 7L, 2L, 5L));
+ }
+
+ @Test
+ public void testMinLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 7L, 2L, 5L));
+ }
+
+ @Test
+ public void testMaxLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "max",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 13L, 2L, 19L));
+ }
+
+ @Test
+ public void testCountLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "count",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 2L, 2L, 2L));
+ }
+
+ @Test
+ public void testSumLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "sum",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 20L, 2L, 24L));
+ }
+
+ @Test
+ public void testAvgLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "avg",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ buildHashMap(null, 10.0, 2L, 12.0));
+ }
+
+ @Test
+ public void testVarLongNullKeyGroupBySingleBatch() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "variance",
+ 4,
+ Arrays.asList(new Long[]{null,2L,01L,02L,01L,01L}),
+ Arrays.asList(new Long[]{13L, 5L,18L,19L,12L,15L}),
+ buildHashMap(null, 0.0, 2L, 49.0, 01L, 6.0));
+ }
+
+ @Test
+ public void testMinNullLongNullKeyGroupBy() throws HiveException {
+ testAggregateLongKeyAggregate(
+ "min",
+ 4,
+ Arrays.asList(new Long[]{null,2L,null,02L}),
+ Arrays.asList(new Long[]{null, null, null, null}),
+ buildHashMap(null, null, 2L, null));
+ }
+
+ @Test
+ public void testMinLongGroupBy() throws HiveException {
+ testAggregateLongAggregate(
+ "min",
+ 2,
+ Arrays.asList(new Long[]{13L,5L,7L,19L}),
+ 5L);
+ }
+
+
@Test
- public void testMinLongSimple () throws HiveException {
+ public void testMinLongSimple() throws HiveException {
testAggregateLongAggregate(
"min",
2,
@@ -735,7 +873,28 @@ public class TestVectorGroupByOperator {
new Long[] {value}, repeat, batchSize);
testAggregateLongIterable (aggregateName, fdr, expected);
}
+
+ public HashMap<Object, Object> buildHashMap(Object... pairs) {
+ HashMap<Object, Object> map = new HashMap<Object, Object>();
+ for(int i = 0; i < pairs.length; i += 2) {
+ map.put(pairs[i], pairs[i+1]);
+ }
+ return map;
+ }
+
+
+ public void testAggregateLongKeyAggregate (
+ String aggregateName,
+ int batchSize,
+ Iterable<Long> keys,
+ Iterable<Long> values,
+ HashMap<Object, Object> expected) throws HiveException {
+ @SuppressWarnings("unchecked")
+ FakeVectorRowBatchFromIterables fdr = new FakeVectorRowBatchFromIterables(batchSize, keys, values);
+ testAggregateLongKeyIterable (aggregateName, fdr, expected);
+ }
+
public void testAggregateLongAggregate (
String aggregateName,
int batchSize,
@@ -915,5 +1074,68 @@ public class TestVectorGroupByOperator {
Validator validator = getValidator(aggregateName);
validator.validate(expected, result);
}
+
+ public void testAggregateLongKeyIterable (
+ String aggregateName,
+ Iterable<VectorizedRowBatch> data,
+ HashMap<Object,Object> expected) throws HiveException {
+ Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+ mapColumnNames.put("Key", 0);
+ mapColumnNames.put("Value", 1);
+ VectorizationContext ctx = new VectorizationContext(mapColumnNames, 2);
+ Set<Object> keys = new HashSet<Object>();
+
+ GroupByDesc desc = buildKeyGroupByDesc (ctx, aggregateName, "Value", "Key");
+
+ VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+ FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+ vgo.initialize(null, null);
+ out.setOutputInspector(new FakeCaptureOutputOperator.OutputInspector() {
+
+ private int rowIndex;
+ private String aggregateName;
+ private HashMap<Object,Object> expected;
+ private Set<Object> keys;
+
+ @Override
+ public void inspectRow(Object row, int tag) throws HiveException {
+ assertTrue(row instanceof Object[]);
+ Object[] fields = (Object[]) row;
+ assertEquals(2, fields.length);
+ Object key = fields[0];
+ Long keyValue = null;
+ if (null != key) {
+ assertTrue(key instanceof LongWritable);
+ LongWritable lwKey = (LongWritable)key;
+ keyValue = lwKey.get();
+ }
+ assertTrue(expected.containsKey(keyValue));
+ Object expectedValue = expected.get(keyValue);
+ Object value = fields[1];
+ Validator validator = getValidator(aggregateName);
+ validator.validate(expectedValue, new Object[] {value});
+ keys.add(keyValue);
+ }
+
+ private FakeCaptureOutputOperator.OutputInspector init(
+ String aggregateName, HashMap<Object,Object> expected, Set<Object> keys) {
+ this.aggregateName = aggregateName;
+ this.expected = expected;
+ this.keys = keys;
+ return this;
+ }
+ }.init(aggregateName, expected, keys));
+
+ for (VectorizedRowBatch unit: data) {
+ vgo.process(unit, 0);
+ }
+ vgo.close(false);
+
+ List<Object> outBatchList = out.getCapturedRows();
+ assertNotNull(outBatchList);
+ assertEquals(expected.size(), outBatchList.size());
+ assertEquals(expected.size(), keys.size());
+ }
}
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java Wed May 22 20:58:08 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.util.VectorizedRowGroupGenUtil;
import org.junit.Test;
public class TestConstantVectorExpression {
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java Wed May 22 20:58:08 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.v
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,20 @@ import org.apache.hadoop.hive.ql.plan.ap
public class FakeCaptureOutputOperator extends Operator<FakeCaptureOutputDesc>
implements Serializable {
private static final long serialVersionUID = 1L;
+
+ public interface OutputInspector {
+ public void inspectRow(Object row, int tag) throws HiveException;
+ }
+
+ private OutputInspector outputInspector;
+
+ public void setOutputInspector(OutputInspector outputInspector) {
+ this.outputInspector = outputInspector;
+ }
+
+ public OutputInspector getOutputInspector() {
+ return outputInspector;
+ }
private transient List<Object> rows;
@@ -52,6 +67,7 @@ public class FakeCaptureOutputOperator e
return out;
}
+
public List<Object> getCapturedRows() {
return rows;
}
@@ -64,6 +80,9 @@ public class FakeCaptureOutputOperator e
@Override
public void processOp(Object row, int tag) throws HiveException {
rows.add(row);
+ if (null != outputInspector) {
+ outputInspector.inspectRow(row, tag);
+ }
}
@Override