You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/03 20:15:17 UTC
svn commit: r1489089 - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/
java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/
java/or...
Author: hashutosh
Date: Mon Jun 3 18:15:16 2013
New Revision: 1489089
URL: http://svn.apache.org/r1489089
Log:
HIVE-4451 : Add support for string column type vector aggregates: COUNT, MIN and MAX (Remus Rusanu via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt
Removed:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Mon Jun 3 18:15:16 2013
@@ -41,15 +41,16 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.SelectColumnIsTrue;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgLong;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountDouble;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdSampDouble;
@@ -941,11 +942,14 @@ public class VectorizationContext {
static Object[][] aggregatesDefinition = {
{"min", "Long", VectorUDAFMinLong.class},
{"min", "Double", VectorUDAFMinDouble.class},
+ {"min", "String", VectorUDAFMinString.class},
{"max", "Long", VectorUDAFMaxLong.class},
{"max", "Double", VectorUDAFMaxDouble.class},
+ {"max", "String", VectorUDAFMaxString.class},
{"count", null, VectorUDAFCountStar.class},
- {"count", "Long", VectorUDAFCountLong.class},
- {"count", "Double", VectorUDAFCountDouble.class},
+ {"count", "Long", VectorUDAFCount.class},
+ {"count", "Double", VectorUDAFCount.class},
+ {"count", "String", VectorUDAFCount.class},
{"sum", "Long", VectorUDAFSumLong.class},
{"sum", "Double", VectorUDAFSumDouble.class},
{"avg", "Long", VectorUDAFAvgLong.class},
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java Mon Jun 3 18:15:16 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+* VectorUDAFCountLong. Vectorized implementation for COUNT aggregates.
+*/
+@Description(name = "count", value = "_FUNC_(expr) - Returns the count (vectorized)")
+public class VectorUDAFCount extends VectorAggregateExpression {
+
+ /**
+ /* class for storing the current aggregate value.
+ */
+ static class Aggregation implements AggregationBuffer {
+ long value;
+ boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
+ }
+
+ private final VectorExpression inputExpression;
+ private final LongWritable result;
+
+ public VectorUDAFCount(VectorExpression inputExpression) {
+ super();
+ this.inputExpression = inputExpression;
+ result = new LongWritable(0);
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ myagg.initIfNull();
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ myagg.value += batchSize;
+ }
+ return;
+ }
+
+ if (inputVector.noNulls) {
+ myagg.value += batchSize;
+ return;
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!isNull[i]) {
+ myagg.value += 1;
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!isNull[i]) {
+ myagg.value += 1;
+ }
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.isNull = true;
+ }
+
+ @Override
+ public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ result.set (myagg.value);
+ return result;
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ }
+
+}
+
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java Mon Jun 3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* VectorUDAFMaxString. Vectorized implementation for MIN/MAX aggregates.
+*/
+@Description(name = "max", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)")
+public class VectorUDAFMaxString extends VectorAggregateExpression {
+
+ /**
+ /* class for storing the current aggregate value.
+ */
+ static private final class Aggregation implements AggregationBuffer {
+
+ final static int MIN_BUFFER_SIZE = 16;
+ byte[] bytes = new byte[MIN_BUFFER_SIZE];
+ int length;
+ boolean isNull;
+
+ public void checkValue(byte[] bytes, int start, int length) {
+ if (isNull) {
+ isNull = false;
+ assign(bytes, start, length);
+ } else if (StringExpr.compare(
+ bytes, start, length,
+ this.bytes, 0, this.length) > 0) {
+ assign(bytes, start, length);
+ }
+ }
+
+ public void assign(byte[] bytes, int start, int length) {
+ // Avoid new allocation if possible
+ if (this.bytes.length < length) {
+ this.bytes = new byte[length];
+ }
+ System.arraycopy(bytes, start, this.bytes, 0, length);
+ this.length = length;
+ }
+ }
+
+ private VectorExpression inputExpression;
+ private BytesWritable result;
+
+ public VectorUDAFMaxString(VectorExpression inputExpression) {
+ super();
+ this.inputExpression = inputExpression;
+ result = new BytesWritable();
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ byte[] bytes = inputColumn.vector[0];
+ int start = inputColumn.start[0];
+ int length = inputColumn.length[0];
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(bytes, start, length);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ myagg.checkValue(inputColumn.vector[0],
+ inputColumn.start[0],
+ inputColumn.length[0]);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.isNull = true;
+ }
+
+ @Override
+ public Object evaluateOutput(
+ AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ result.set(myagg.bytes, 0, myagg.length);
+ return result;
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+}
+
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java Mon Jun 3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* VectorUDAFMinString. Vectorized implementation for MIN/MAX aggregates.
+*/
+@Description(name = "min", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)")
+public class VectorUDAFMinString extends VectorAggregateExpression {
+
+ /**
+ /* class for storing the current aggregate value.
+ */
+ static private final class Aggregation implements AggregationBuffer {
+
+ final static int MIN_BUFFER_SIZE = 16;
+ byte[] bytes = new byte[MIN_BUFFER_SIZE];
+ int length;
+ boolean isNull;
+
+ public void checkValue(byte[] bytes, int start, int length) {
+ if (isNull) {
+ isNull = false;
+ assign(bytes, start, length);
+ } else if (StringExpr.compare(
+ bytes, start, length,
+ this.bytes, 0, this.length) < 0) {
+ assign(bytes, start, length);
+ }
+ }
+
+ public void assign(byte[] bytes, int start, int length) {
+ // Avoid new allocation if possible
+ if (this.bytes.length < length) {
+ this.bytes = new byte[length];
+ }
+ System.arraycopy(bytes, start, this.bytes, 0, length);
+ this.length = length;
+ }
+ }
+
+ private VectorExpression inputExpression;
+ private BytesWritable result;
+
+ public VectorUDAFMinString(VectorExpression inputExpression) {
+ super();
+ this.inputExpression = inputExpression;
+ result = new BytesWritable();
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ byte[] bytes = inputColumn.vector[0];
+ int start = inputColumn.start[0];
+ int length = inputColumn.length[0];
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(bytes, start, length);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ myagg.checkValue(inputColumn.vector[0],
+ inputColumn.start[0],
+ inputColumn.length[0]);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.isNull = true;
+ }
+
+ @Override
+ public Object evaluateOutput(
+ AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ result.set(myagg.bytes, 0, myagg.length);
+ return result;
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+}
+
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java Mon Jun 3 18:15:16 2013
@@ -194,9 +194,8 @@ public class CodeGen {
{"VectorUDAFMinMax", "VectorUDAFMaxLong", "long", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: long)"},
{"VectorUDAFMinMax", "VectorUDAFMaxDouble", "double", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: double)"},
- //template, <ClassName>, <ValueType>
- {"VectorUDAFCount", "VectorUDAFCountLong", "long"},
- {"VectorUDAFCount", "VectorUDAFCountDouble", "double"},
+ {"VectorUDAFMinMaxString", "VectorUDAFMinString", "<", "min", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"},
+ {"VectorUDAFMinMaxString", "VectorUDAFMaxString", ">", "max", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"},
//template, <ClassName>, <ValueType>
{"VectorUDAFSum", "VectorUDAFSumLong", "long"},
@@ -280,6 +279,8 @@ public class CodeGen {
generateVectorUDAFCount(tdesc);
} else if (tdesc[0].equals("VectorUDAFMinMax")) {
generateVectorUDAFMinMax(tdesc);
+ } else if (tdesc[0].equals("VectorUDAFMinMaxString")) {
+ generateVectorUDAFMinMaxString(tdesc);
} else if (tdesc[0].equals("VectorUDAFSum")) {
generateVectorUDAFSum(tdesc);
} else if (tdesc[0].equals("VectorUDAFAvg")) {
@@ -323,6 +324,25 @@ public class CodeGen {
}
+ private void generateVectorUDAFMinMaxString(String[] tdesc) throws Exception {
+ String className = tdesc[1];
+ String operatorSymbol = tdesc[2];
+ String descName = tdesc[3];
+ String descValue = tdesc[4];
+
+ String outputFile = joinPath(this.outputDirectory, className + ".java");
+ String templateFile = joinPath(this.templateDirectory, tdesc[0] + ".txt");
+
+ String templateString = readFile(templateFile);
+ templateString = templateString.replaceAll("<ClassName>", className);
+ templateString = templateString.replaceAll("<OperatorSymbol>", operatorSymbol);
+ templateString = templateString.replaceAll("<DescriptionName>", descName);
+ templateString = templateString.replaceAll("<DescriptionValue>", descValue);
+ writeFile(outputFile, templateString);
+
+ }
+
+
private void generateVectorUDAFCount(String[] tdesc) throws IOException {
String className = tdesc[1];
String valueType = tdesc[2];
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt Mon Jun 3 18:15:16 2013
@@ -44,7 +44,7 @@ import org.apache.hadoop.hive.serde2.obj
/**
* <ClassName>. Vectorized implementation for COUNT aggregates.
*/
-@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: <ValueType>)")
+@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized)")
public class <ClassName> extends VectorAggregateExpression {
/**
@@ -94,7 +94,7 @@ public class <ClassName> extends VectorA
inputExpression.evaluate(batch);
- <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
+ VectorColumn inputVector = (VectorColumn)batch.
cols[this.inputExpression.getOutputColumn()];
if (inputVector.noNulls) {
@@ -172,7 +172,7 @@ public class <ClassName> extends VectorA
inputExpression.evaluate(batch);
- <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
+ VectorColumn inputVector = (VectorColumn)batch.
cols[this.inputExpression.getOutputColumn()];
int batchSize = batch.size;
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt Mon Jun 3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* <ClassName>. Vectorized implementation for MIN/MAX aggregates.
+*/
+@Description(name = "<DescriptionName>", value = "<DescriptionValue>")
+public class <ClassName> extends VectorAggregateExpression {
+
+ /**
+ /* class for storing the current aggregate value.
+ */
+ static private final class Aggregation implements AggregationBuffer {
+
+ final static int MIN_BUFFER_SIZE = 16;
+ byte[] bytes = new byte[MIN_BUFFER_SIZE];
+ int length;
+ boolean isNull;
+
+ public void checkValue(byte[] bytes, int start, int length) {
+ if (isNull) {
+ isNull = false;
+ assign(bytes, start, length);
+ } else if (StringExpr.compare(
+ bytes, start, length,
+ this.bytes, 0, this.length) <OperatorSymbol> 0) {
+ assign(bytes, start, length);
+ }
+ }
+
+ public void assign(byte[] bytes, int start, int length) {
+ // Avoid new allocation if possible
+ if (this.bytes.length < length) {
+ this.bytes = new byte[length];
+ }
+ System.arraycopy(bytes, start, this.bytes, 0, length);
+ this.length = length;
+ }
+ }
+
+ private VectorExpression inputExpression;
+ private BytesWritable result;
+
+ public <ClassName>(VectorExpression inputExpression) {
+ super();
+ this.inputExpression = inputExpression;
+ result = new BytesWritable();
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+@Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColumn.noNulls) {
+ if (inputColumn.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ } else {
+ if (inputColumn.isRepeating) {
+ // All nulls, no-op for min/max
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize, batch.selected);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColumn, batchSize);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ byte[] bytes = inputColumn.vector[0];
+ int start = inputColumn.start[0];
+ int length = inputColumn.length[0];
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(bytes, start, length);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selection) {
+
+ for (int i=0; i < batchSize; ++i) {
+ int row = selection[i];
+ if (!inputColumn.isNull[row]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[row],
+ inputColumn.start[row],
+ inputColumn.length[row]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ BytesColumnVector inputColumn = (BytesColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ if (inputColumn.isRepeating) {
+ if (inputColumn.noNulls) {
+ myagg.checkValue(inputColumn.vector[0],
+ inputColumn.start[0],
+ inputColumn.length[0]);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColumn.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+ }
+ else if (inputColumn.noNulls){
+ iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize,
+ int[] selected) {
+
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+
+ for (int i=0; i< batchSize; ++i) {
+ if (!inputColumn.isNull[i]) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ BytesColumnVector inputColumn,
+ int batchSize) {
+ for (int i=0; i< batchSize; ++i) {
+ myagg.checkValue(inputColumn.vector[i],
+ inputColumn.start[i],
+ inputColumn.length[i]);
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.isNull = true;
+ }
+
+ @Override
+ public Object evaluateOutput(
+ AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ result.set(myagg.bytes, 0, myagg.length);
+ return result;
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ }
+}
+
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Mon Jun 3 18:15:16 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -66,9 +67,10 @@ public class TestVectorGroupByOperator {
private static AggregationDesc buildAggregationDesc(
VectorizationContext ctx,
String aggregate,
- String column) {
+ String column,
+ TypeInfo typeInfo) {
- ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, TypeInfoFactory.longTypeInfo);
+ ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
params.add(inputColumn);
@@ -88,12 +90,13 @@ public class TestVectorGroupByOperator {
}
- private static GroupByDesc buildGroupByDesc(
+ private static GroupByDesc buildGroupByDescLong(
VectorizationContext ctx,
String aggregate,
String column) {
- AggregationDesc agg = buildAggregationDesc(ctx, aggregate, column);
+ AggregationDesc agg = buildAggregationDesc(ctx, aggregate,
+ column, TypeInfoFactory.longTypeInfo);
ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
aggs.add(agg);
@@ -106,6 +109,28 @@ public class TestVectorGroupByOperator {
return desc;
}
+
+ private static GroupByDesc buildGroupByDescString(
+ VectorizationContext ctx,
+ String aggregate,
+ String column) {
+
+ AggregationDesc agg = buildAggregationDesc(ctx, aggregate,
+ column, TypeInfoFactory.stringTypeInfo);
+ ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
+ aggs.add(agg);
+
+ ArrayList<String> outputColumnNames = new ArrayList<String>();
+ outputColumnNames.add("_col0");
+
+ GroupByDesc desc = new GroupByDesc();
+ desc.setOutputColumnNames(outputColumnNames);
+ desc.setAggregators(aggs);
+
+ return desc;
+ }
+
+
private static GroupByDesc buildGroupByDescCountStar(
VectorizationContext ctx) {
@@ -131,7 +156,7 @@ public class TestVectorGroupByOperator {
TypeInfo typeInfo,
String key) {
- GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column);
+ GroupByDesc desc = buildGroupByDescLong(ctx, aggregate, column);
ExprNodeDesc keyExp = buildColumnDesc(ctx, key, typeInfo);
ArrayList<ExprNodeDesc> keys = new ArrayList<ExprNodeDesc>();
@@ -150,6 +175,76 @@ public class TestVectorGroupByOperator {
}
@Test
+ public void testCountString () throws HiveException {
+ testAggregateString(
+ "count",
+ 2,
+ Arrays.asList(new Object[]{"A","B","C"}),
+ 3L);
+ }
+
+ @Test
+ public void testMaxString () throws HiveException {
+ testAggregateString(
+ "max",
+ 2,
+ Arrays.asList(new Object[]{"A","B","C"}),
+ "C");
+ testAggregateString(
+ "max",
+ 2,
+ Arrays.asList(new Object[]{"C", "B", "A"}),
+ "C");
+ }
+
+ @Test
+ public void testMinString () throws HiveException {
+ testAggregateString(
+ "min",
+ 2,
+ Arrays.asList(new Object[]{"A","B","C"}),
+ "A");
+ testAggregateString(
+ "min",
+ 2,
+ Arrays.asList(new Object[]{"C", "B", "A"}),
+ "A");
+ }
+
+ @Test
+ public void testMaxNullString () throws HiveException {
+ testAggregateString(
+ "max",
+ 2,
+ Arrays.asList(new Object[]{"A","B",null}),
+ "B");
+ testAggregateString(
+ "max",
+ 2,
+ Arrays.asList(new Object[]{null, null, null}),
+ null);
+ }
+
+ @Test
+ public void testCountStringWithNull () throws HiveException {
+ testAggregateString(
+ "count",
+ 2,
+ Arrays.asList(new Object[]{"A",null,"C", "D", null}),
+ 3L);
+ }
+
+ @Test
+ public void testCountStringAllNull () throws HiveException {
+ testAggregateString(
+ "count",
+ 4,
+ Arrays.asList(new Object[]{null, null, null, null, null}),
+ 0L);
+ }
+
+
+ @Test
public void testMinLongNullStringKeys() throws HiveException {
testAggregateStringKeyAggregate(
"min",
@@ -969,6 +1064,19 @@ public class TestVectorGroupByOperator {
testAggregateLongKeyIterable (aggregateName, fdr, expected);
}
+ public void testAggregateString (
+ String aggregateName,
+ int batchSize,
+ Iterable<Object> values,
+ Object expected) throws HiveException {
+
+ @SuppressWarnings("unchecked")
+ FakeVectorRowBatchFromObjectIterables fdr = new FakeVectorRowBatchFromObjectIterables(
+ batchSize, new String[] {"string"}, values);
+ testAggregateStringIterable (aggregateName, fdr, expected);
+ }
+
+
public void testAggregateLongAggregate (
String aggregateName,
int batchSize,
@@ -1001,14 +1109,19 @@ public class TestVectorGroupByOperator {
assertEquals(true, result instanceof Object[]);
Object[] arr = (Object[]) result;
- assertEquals (1, arr.length);
+ assertEquals(1, arr.length);
if (expected == null) {
assertNull (arr[0]);
- } else {
- assertEquals (true, arr[0] instanceof LongWritable);
+ } else if (arr[0] instanceof LongWritable) {
LongWritable lw = (LongWritable) arr[0];
- assertEquals ((Long) expected, (Long) lw.get());
+ assertEquals((Long) expected, (Long) lw.get());
+ } else if (arr[0] instanceof BytesWritable) {
+ BytesWritable bw = (BytesWritable) arr[0];
+ String sbw = new String(bw.getBytes());
+ assertEquals((String) expected, sbw);
+ } else {
+ Assert.fail("Unsupported result type: " + expected.getClass().getName());
}
}
}
@@ -1159,6 +1272,37 @@ public class TestVectorGroupByOperator {
validator.validate(expected, result);
}
+ public void testAggregateStringIterable (
+ String aggregateName,
+ Iterable<VectorizedRowBatch> data,
+ Object expected) throws HiveException {
+ Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+ mapColumnNames.put("A", 0);
+ VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
+
+ GroupByDesc desc = buildGroupByDescString (ctx, aggregateName, "A");
+
+ VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+ FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+ vgo.initialize(null, null);
+
+ for (VectorizedRowBatch unit: data) {
+ vgo.process(unit, 0);
+ }
+ vgo.close(false);
+
+ List<Object> outBatchList = out.getCapturedRows();
+ assertNotNull(outBatchList);
+ assertEquals(1, outBatchList.size());
+
+ Object result = outBatchList.get(0);
+
+ Validator validator = getValidator(aggregateName);
+ validator.validate(expected, result);
+ }
+
+
public void testAggregateLongIterable (
String aggregateName,
Iterable<VectorizedRowBatch> data,
@@ -1167,7 +1311,7 @@ public class TestVectorGroupByOperator {
mapColumnNames.put("A", 0);
VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
- GroupByDesc desc = buildGroupByDesc (ctx, aggregateName, "A");
+ GroupByDesc desc = buildGroupByDescLong (ctx, aggregateName, "A");
VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);