You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/05/22 22:58:09 UTC
svn commit: r1485419 [1/4] - in /hive/branches/vectorization:
metastore/src/java/org/apache/hadoop/hive/metastore/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
ql/src/java/org/apache/hadoop/hive/ql/exec...
Author: omalley
Date: Wed May 22 20:58:08 2013
New Revision: 1485419
URL: http://svn.apache.org/r1485419
Log:
HIVE-4450 Extend Vector Aggregates to support GROUP BY (Remus Rusanu via
omalley)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt
Modified:
hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java
Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Wed May 22 20:58:08 2013
@@ -51,7 +51,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.serde.serdeConstants;;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java Wed May 22 20:58:08 2013
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A hash map key wrapper for vectorized processing.
+ * It stores the key values as primitives in arrays for each supported primitive type.
+ * This works in conjunction with
+ * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch}
+ * to hash vectorized processing units (batches).
+ */
+public class VectorHashKeyWrapper extends KeyWrapper {
+
+ private long[] longValues;
+ private double[] doubleValues;
+ private boolean[] isNull;
+ private int hashcode;
+
+ public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount) {
+ longValues = new long[longValuesCount];
+ doubleValues = new double[doubleValuesCount];
+ isNull = new boolean[longValuesCount + doubleValuesCount];
+ }
+
+ private VectorHashKeyWrapper() {
+ }
+
+ @Override
+ void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException {
+ throw new HiveException("Should not be called");
+ }
+
+ @Override
+ void setHashKey() {
+ hashcode = Arrays.hashCode(longValues) ^
+ Arrays.hashCode(doubleValues) ^
+ Arrays.hashCode(isNull);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof VectorHashKeyWrapper) {
+ VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that;
+ return hashcode == keyThat.hashcode &&
+ Arrays.equals(longValues, keyThat.longValues) &&
+ Arrays.equals(doubleValues, keyThat.doubleValues) &&
+ Arrays.equals(isNull, keyThat.isNull);
+ }
+ return false;
+ }
+
+ @Override
+ protected Object clone() {
+ VectorHashKeyWrapper clone = new VectorHashKeyWrapper();
+ clone.longValues = longValues.clone();
+ clone.doubleValues = doubleValues.clone();
+ clone.isNull = isNull.clone();
+ clone.hashcode = hashcode;
+ return clone;
+ }
+
+ @Override
+ public KeyWrapper copyKey() {
+ return (KeyWrapper) clone();
+ }
+
+ @Override
+ void copyKey(KeyWrapper oldWrapper) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ Object[] getKeyArray() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void assignDouble(int index, double d) {
+ doubleValues[index] = d;
+ isNull[longValues.length + index] = false;
+ }
+
+ public void assignNullDouble(int index) {
+ doubleValues[index] = 0; // assign 0 to simplify hashcode
+ isNull[longValues.length + index] = true;
+ }
+
+ public void assignLong(int index, long v) {
+ longValues[index] = v;
+ isNull[index] = false;
+ }
+
+ public void assignNullLong(int index) {
+ longValues[index] = 0; // assign 0 to simplify hashcode
+ isNull[index] = true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%d[%s] %d[%s]",
+ longValues.length, Arrays.toString(longValues),
+ doubleValues.length, Arrays.toString(doubleValues));
+ }
+
+ public boolean getIsNull(int i) {
+ return isNull[i];
+ }
+
+ public long getLongValue(int i) {
+ return longValues[i];
+ }
+
+ public double getDoubleValue(int i) {
+ return doubleValues[i - longValues.length];
+ }
+
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java Wed May 22 20:58:08 2013
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Class for handling vectorized hash map key wrappers. It evaluates the key columns in a
+ * row batch in a vectorized fashion.
+ * This class stores additional information about keys needed to evaluate and output the key values.
+ *
+ */
+public class VectorHashKeyWrapperBatch {
+
+ /**
+ * Helper class for looking up a key value based on key index
+ *
+ */
+ private static class KeyLookupHelper {
+ public int longIndex;
+ public int doubleIndex;
+ }
+
+ /**
+ * The key expressions that require evaluation and output the primitive values for each key.
+ */
+ private VectorExpression[] keyExpressions;
+
+ /**
+ * indices of LONG primitive keys
+ */
+ private int[] longIndices;
+
+ /**
+ * indices of DOUBLE primitive keys
+ */
+ private int[] doubleIndices;
+
+ /**
+ * pre-allocated batch size vector of keys wrappers.
+ * N.B. these keys are **mutable** and should never be used in a HashMap.
+ * Always clone the key wrapper to obtain an immutable keywrapper suitable
+ * to use a key in a HashMap.
+ */
+ private VectorHashKeyWrapper[] vectorHashKeyWrappers;
+
+ /**
+ * lookup vector to map from key index to primitive type index
+ */
+ private KeyLookupHelper[] indexLookup;
+
+ /**
+ * preallocated and reused LongWritable objects for emiting row mode key values
+ */
+ private LongWritable[] longKeyValueOutput;
+
+ /**
+ * preallocated and reused DoubleWritable objects for emiting row mode key values
+ */
+ private DoubleWritable[] doubleKeyValueOutput;
+
+ /**
+ * Accessor for the batch-sized array of key wrappers
+ */
+ public VectorHashKeyWrapper[] getVectorHashKeyWrappers() {
+ return vectorHashKeyWrappers;
+ }
+
+ /**
+ * Processes a batch:
+ * <ul>
+ * <li>Evaluates each key vector expression.</li>
+ * <li>Copies out each key's primitive values into the key wrappers</li>
+ * <li>computes the hashcode of the key wrappers</li>
+ * </ul>
+ * @param vrb
+ * @throws HiveException
+ */
+ public void evaluateBatch (VectorizedRowBatch vrb) throws HiveException {
+ for(int i = 0; i < keyExpressions.length; ++i) {
+ keyExpressions[i].evaluate(vrb);
+ }
+ for(int i = 0; i< longIndices.length; ++i) {
+ int keyIndex = longIndices[i];
+ int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+ LongColumnVector columnVector = (LongColumnVector) vrb.cols[columnIndex];
+ if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignLongNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignLongNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected);
+ } else if (columnVector.noNulls && columnVector.isRepeating) {
+ assignLongNoNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignLongNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && columnVector.isRepeating) {
+ assignLongNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignLongNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected);
+ } else {
+ throw new HiveException (String.format("Unimplemented Long null/repeat/selected combination %b/%b/%b",
+ columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse));
+ }
+ }
+ for(int i=0;i<doubleIndices.length; ++i) {
+ int keyIndex = doubleIndices[i];
+ int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+ DoubleColumnVector columnVector = (DoubleColumnVector) vrb.cols[columnIndex];
+ if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignDoubleNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignDoubleNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected);
+ } else if (columnVector.noNulls && columnVector.isRepeating) {
+ assignDoubleNoNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+ assignDoubleNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && columnVector.isRepeating) {
+ assignDoubleNullsRepeating(i, vrb.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+ assignDoubleNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected);
+ } else {
+ throw new HiveException (String.format("Unimplemented Double null/repeat/selected combination %b/%b/%b",
+ columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse));
+ }
+ }
+ for(int i=0;i<vrb.size;++i) {
+ vectorHashKeyWrappers[i].setHashKey();
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, possible nulls, no repeat values, batch selection vector.
+ */
+ private void assignDoubleNullsNoRepeatingSelection(int index, int size,
+ DoubleColumnVector columnVector, int[] selected) {
+ for(int r = 0; r < size; ++r) {
+ if (!columnVector.isNull[r]) {
+ vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[selected[r]]);
+ } else {
+ vectorHashKeyWrappers[r].assignNullDouble(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Double type, repeat null values.
+ */
+ private void assignDoubleNullsRepeating(int index, int size,
+ DoubleColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignNullDouble(index);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Double type, possible nulls, repeat values.
+ */
+ private void assignDoubleNullsNoRepeatingNoSelection(int index, int size,
+ DoubleColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ if (!columnVector.isNull[r]) {
+ vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[r]);
+ } else {
+ vectorHashKeyWrappers[r].assignNullDouble(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, repeat values, no selection vector.
+ */
+ private void assignDoubleNoNullsRepeating(int index, int size, DoubleColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[0]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, no repeat values, batch selection vector.
+ */
+ private void assignDoubleNoNullsNoRepeatingSelection(int index, int size,
+ DoubleColumnVector columnVector, int[] selected) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[selected[r]]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, no repeat values, no selection vector.
+ */
+ private void assignDoubleNoNullsNoRepeatingNoSelection(int index, int size,
+ DoubleColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[r]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, possible nulls, no repeat values, batch selection vector.
+ */
+ private void assignLongNullsNoRepeatingSelection(int index, int size,
+ LongColumnVector columnVector, int[] selected) {
+ for(int r = 0; r < size; ++r) {
+ if (!columnVector.isNull[selected[r]]) {
+ vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[selected[r]]);
+ } else {
+ vectorHashKeyWrappers[r].assignNullLong(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, repeating nulls.
+ */
+ private void assignLongNullsRepeating(int index, int size,
+ LongColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignNullLong(index);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, possible nulls, no repeat values, no selection vector.
+ */
+ private void assignLongNullsNoRepeatingNoSelection(int index, int size,
+ LongColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ if (!columnVector.isNull[r]) {
+ vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[r]);
+ } else {
+ vectorHashKeyWrappers[r].assignNullLong(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, repeat values, no selection vector.
+ */
+ private void assignLongNoNullsRepeating(int index, int size, LongColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[0]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, no repeat values, batch selection vector.
+ */
+ private void assignLongNoNullsNoRepeatingSelection(int index, int size,
+ LongColumnVector columnVector, int[] selected) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[selected[r]]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for double type, no nulls, no repeat values, no selection vector.
+ */
+ private void assignLongNoNullsNoRepeatingNoSelection(int index, int size,
+ LongColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[r]);
+ }
+ }
+
+ /**
+ * Prepares a VectorHashKeyWrapperBatch to work for a specific set of keys.
+ * Computes the fast access lookup indices, preallocates all needed internal arrays.
+ * This step is done only once per query, not once per batch. The information computed now
+ * will be used to generate proper individual VectorKeyHashWrapper objects.
+ */
+ public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions)
+ throws HiveException {
+ VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch();
+ compiledKeyWrapperBatch.keyExpressions = keyExpressions;
+
+ // We'll overallocate and then shrink the array for each type
+ int[] longIndices = new int[keyExpressions.length];
+ int longIndicesIndex = 0;
+ int[] doubleIndices = new int[keyExpressions.length];
+ int doubleIndicesIndex = 0;
+ KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length];
+
+ // Inspect the output type of each key expression.
+ for(int i=0; i < keyExpressions.length; ++i) {
+ indexLookup[i] = new KeyLookupHelper();
+ String outputType = keyExpressions[i].getOutputType();
+ if (outputType.equalsIgnoreCase("long") ||
+ outputType.equalsIgnoreCase("bigint")) {
+ longIndices[longIndicesIndex] = i;
+ indexLookup[i].longIndex = longIndicesIndex;
+ indexLookup[i].doubleIndex = -1;
+ ++longIndicesIndex;
+ } else if (outputType.equalsIgnoreCase("double")) {
+ doubleIndices[doubleIndicesIndex] = i;
+ indexLookup[i].longIndex = -1;
+ indexLookup[i].doubleIndex = doubleIndicesIndex;
+ ++doubleIndicesIndex;
+ } else {
+ throw new HiveException("Unsuported vector output type: " + outputType);
+ }
+ }
+ compiledKeyWrapperBatch.indexLookup = indexLookup;
+ compiledKeyWrapperBatch.longKeyValueOutput = new LongWritable[longIndicesIndex];
+ for (int i=0; i < longIndicesIndex; ++i) {
+ compiledKeyWrapperBatch.longKeyValueOutput[i] = new LongWritable();
+ }
+ compiledKeyWrapperBatch.doubleKeyValueOutput = new DoubleWritable[doubleIndicesIndex];
+ for (int i=0; i < doubleIndicesIndex; ++i) {
+ compiledKeyWrapperBatch.doubleKeyValueOutput[i] = new DoubleWritable();
+ }
+ compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
+ compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
+ compiledKeyWrapperBatch.vectorHashKeyWrappers =
+ new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
+ for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
+ compiledKeyWrapperBatch.vectorHashKeyWrappers[i] =
+ new VectorHashKeyWrapper(longIndicesIndex, doubleIndicesIndex);
+ }
+ return compiledKeyWrapperBatch;
+ }
+
+ /**
+ * Get the row-mode writable object value of a key from a key wrapper
+ */
+ public Object getWritableKeyValue(VectorHashKeyWrapper kw, int i)
+ throws HiveException {
+ if (kw.getIsNull(i)) {
+ return null;
+ }
+ KeyLookupHelper klh = indexLookup[i];
+ if (klh.longIndex >= 0) {
+ longKeyValueOutput[klh.longIndex].set(kw.getLongValue(i));
+ return longKeyValueOutput[klh.longIndex];
+ } else if (klh.doubleIndex >= 0) {
+ doubleKeyValueOutput[klh.doubleIndex].set(kw.getDoubleValue(i));
+ return doubleKeyValueOutput[klh.doubleIndex];
+ } else {
+ throw new HiveException(String.format(
+ "Internal inconsistent KeyLookupHelper at index [%d]:%d %d",
+ i, klh.longIndex, klh.doubleIndex));
+ }
+ }
+}
+
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java Wed May 22 20:58:08 2013
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+/**
+ * This maps a batch to the aggregation buffers sets to use for each row (key)
+ *
+ */
+public class VectorAggregationBufferBatch {
+
+ /**
+ * Batch sized array of aggregation buffer sets.
+ * The array is preallocated and is reused for each batch, but the individual entries
+ * will reference different aggregation buffer set from batch to batch.
+ * the array is not reset between batches, content past this.index will be stale.
+ */
+ private VectorAggregationBufferRow[] aggregationBuffers;
+
+ /**
+ * the selection vector that maps row within a batch to the
+ * specific aggregation buffer set to use.
+ */
+ private int[] selection;
+
+ /**
+ * versioning number gets incremented on each batch. This allows us to cache the selection
+ * mapping info in the aggregation buffer set themselves while still being able to
+ * detect stale info.
+ */
+ private int version;
+
+ /**
+ * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch.
+ */
+ private int distinctCount;
+
+ /**
+ * the array of aggregation buffers for the current batch.
+ * content past the {@link #getDistinctBufferSetCount()} index
+ * is stale from previous batches.
+ * @return
+ */
+ public VectorAggregationBufferRow[] getAggregationBuffers() {
+ return aggregationBuffers;
+ }
+
+ /**
+ * number of distinct aggregation buffer sets (ie. keys) in the current batch.
+ * @return
+ */
+ public int getDistinctBufferSetCount () {
+ return distinctCount;
+ }
+
+ /**
+ * gets the selection vector to use for the current batch. This maps the batch rows by position
+ * (row number) to an index in the {@link #getAggregationBuffers()} array.
+ * @return
+ */
+ public int[] getSelectionVector() {
+ return selection;
+ }
+
+ public VectorAggregationBufferBatch() {
+ aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
+ selection = new int [VectorizedRowBatch.DEFAULT_SIZE];
+ }
+
+ /**
+ * resets the internal aggregation buffers sets index and increments the versioning
+ * used to optimize the selection vector population.
+ */
+ public void startBatch() {
+ version++;
+ distinctCount = 0;
+ }
+
+ /**
+ * assigns the given aggregation buffer set to a given batch row (by row number).
+ * populates the selection vector appropriately. This is where the versioning numbers
+ * play a role in determining if the index cached on the aggregation buffer set is stale.
+ */
+ public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) {
+ if (version != bufferSet.getVersion()) {
+ bufferSet.setVersionAndIndex(version, distinctCount);
+ ++distinctCount;
+ }
+ aggregationBuffers[row] = bufferSet;
+ }
+
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java Wed May 22 20:58:08 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+
+/**
+ * Represents a set of aggregation buffers to be used for a specific key for UDAF GROUP BY.
+ *
+ */
+public class VectorAggregationBufferRow {
+ private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers;
+ private int version;
+ private int index;
+
+ public VectorAggregationBufferRow(
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) {
+ this.aggregationBuffers = aggregationBuffers;
+ }
+
+ /**
+ * returns the aggregation buffer for an aggregation expression, by index.
+ */
+ public VectorAggregateExpression.AggregationBuffer getAggregationBuffer(int bufferIndex) {
+ return aggregationBuffers[bufferIndex];
+ }
+
+ /**
+ * returns the array of aggregation buffers (the entire set).
+ */
+ public VectorAggregateExpression.AggregationBuffer[] getAggregationBuffers() {
+ return aggregationBuffers;
+ }
+
+ /**
+ * Versioning used to detect staleness of the index cached for benefit of
+ * {@link org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferBatch VectorAggregationBufferBatch}.
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ /**
+ * cached index used by VectorAggregationBufferBatch.
+ * @return
+ */
+ public int getIndex() {
+ return index;
+ }
+
+ /**
+ * accessor for VectorAggregationBufferBatch to set its caching info on this set.
+ */
+ public void setVersionAndIndex(int version, int index) {
+ this.index = index;
+ this.version = version;
+ }
+
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed May 22 20:58:08 2013
@@ -20,14 +20,20 @@ package org.apache.hadoop.hive.ql.exec.v
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapper;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
- .VectorAggregateExpression.AggregationBuffer;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -37,19 +43,44 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
/**
- * Vectorized GROUP BY operator impelementation. Consumes the vectorized input and
- * stores the aggregates operators intermediate states. Emits row mode output.
+ * Vectorized GROUP BY operator implementation. Consumes the vectorized input and
+ * stores the aggregate operators' intermediate states. Emits row mode output.
*
*/
public class VectorGroupByOperator extends Operator<GroupByDesc> implements Serializable {
+ private static final Log LOG = LogFactory.getLog(
+ VectorGroupByOperator.class.getName());
+
private final VectorizationContext vContext;
- protected transient VectorAggregateExpression[] aggregators;
- protected transient AggregationBuffer[] aggregationBuffers;
+ /**
+ * This is the vector of aggregators. They are stateless and only implement
+ * the algorithm of how to compute the aggregation. state is kept in the
+ * aggregation buffers and is our responsibility to match the proper state for each key.
+ */
+ private transient VectorAggregateExpression[] aggregators;
+
+ /**
+ * Key vector expressions.
+ */
+ private transient VectorExpression[] keyExpressions;
+
+ /**
+ * The aggregation buffers to use for the current batch.
+ */
+ private transient VectorAggregationBufferBatch aggregationBatchInfo;
+
+ /**
+ * The current batch key wrappers.
+ * The very same instance gets reused for all batches.
+ */
+ private transient VectorHashKeyWrapperBatch keyWrappersBatch;
- transient int heartbeatInterval;
- transient int countAfterReport;
+ /**
+ * The global key-aggregation hash map.
+ */
+ private transient Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
private static final long serialVersionUID = 1L;
@@ -68,17 +99,22 @@ public class VectorGroupByOperator exten
vContext.setOperatorType(OperatorType.GROUPBY);
ArrayList<AggregationDesc> aggrDesc = conf.getAggregators();
+ keyExpressions = vContext.getVectorExpressions(conf.getKeys());
+
+ for(int i = 0; i < keyExpressions.length; ++i) {
+ objectInspectors.add(vContext.createObjectInspector(keyExpressions[i]));
+ }
aggregators = new VectorAggregateExpression[aggrDesc.size()];
- aggregationBuffers = new AggregationBuffer[aggrDesc.size()];
for (int i = 0; i < aggrDesc.size(); ++i) {
AggregationDesc desc = aggrDesc.get(i);
aggregators[i] = vContext.getAggregatorExpression (desc);
- aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
- aggregators[i].reset(aggregationBuffers[i]);
-
objectInspectors.add(aggregators[i].getOutputObjectInspector());
}
+
+ keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+ aggregationBatchInfo = new VectorAggregationBufferBatch();
+ mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
List<String> outputFieldNames = conf.getOutputColumnNames();
outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
@@ -94,22 +130,120 @@ public class VectorGroupByOperator exten
@Override
public void processOp(Object row, int tag) throws HiveException {
- VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+ VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+ // First we traverse the batch to evaluate and prepare the KeyWrappers
+ // After this the KeyWrappers are properly set and hash code is computed
+ keyWrappersBatch.evaluateBatch(batch);
+
+ // Next we locate the aggregation buffer set for each key
+ prepareBatchAggregationBufferSets(batch);
+
+ // Finally, evaluate the aggregators
+ processAggregators(batch);
+ }
+
+ /**
+ * Evaluates the aggregators on the current batch.
+ * The aggregationBatchInfo must have been prepared
+ * by calling {@link #prepareBatchAggregationBufferSets} first.
+ */
+ private void processAggregators(VectorizedRowBatch batch) throws HiveException {
+ // We now have a vector of aggregation buffer sets to use for each row
+ // We can start computing the aggregates.
+ // If the number of distinct keys in the batch is 1 we can
+ // use the optimized code path of aggregateInput
+ VectorAggregationBufferRow[] aggregationBufferSets =
+ aggregationBatchInfo.getAggregationBuffers();
+ if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ aggregationBufferSets[0].getAggregationBuffers();
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].aggregateInput(aggregationBuffers[i], batch);
+ }
+ } else {
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].aggregateInputSelection(
+ aggregationBufferSets,
+ i,
+ batch);
+ }
+ }
+ }
+
+ /**
+ * Locates the aggregation buffer sets to use for each key in the current batch.
+ * The keyWrappersBatch must have evaluated the current batch first.
+ */
+ private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException {
+ // The aggregation batch vector needs to know when we start a new batch
+ // to bump its internal version.
+ aggregationBatchInfo.startBatch();
+
+ // We now have to probe the global hash and find-or-allocate
+ // the aggregation buffers to use for each key present in the batch
+ VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
+ for (int i=0; i < batch.size; ++i) {
+ VectorHashKeyWrapper kw = keyWrappers[i];
+ VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
+ if (null == aggregationBuffer) {
+ // the probe failed, we must allocate a set of aggregation buffers
+ // and push the (keywrapper,buffers) pair into the hash.
+ // is very important to clone the keywrapper, the one we have from our
+ // keyWrappersBatch is going to be reset/reused on next batch.
+ aggregationBuffer = allocateAggregationBuffer();
+ mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+ }
+ aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
+ }
+ }
- //TODO: proper group by hash
- for (int i = 0; i < aggregators.length; ++i) {
- aggregators[i].aggregateInput(aggregationBuffers[i], vrg);
+ /**
+ * allocates a new aggregation buffer set.
+ */
+ private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+ VectorAggregateExpression.AggregationBuffer[] aggregationBuffers =
+ new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+ for (int i=0; i < aggregators.length; ++i) {
+ aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+ aggregators[i].reset(aggregationBuffers[i]);
}
+ VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers);
+ return bufferSet;
}
@Override
public void closeOp(boolean aborted) throws HiveException {
if (!aborted) {
- Object[] forwardCache = new Object[aggregators.length];
- for (int i = 0; i < aggregators.length; ++i) {
- forwardCache[i] = aggregators[i].evaluateOutput(aggregationBuffers[i]);
+ Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
+ if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
+
+ // if this is a global aggregation (no keys) and empty set, must still emit NULLs
+ VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
+ for (int i = 0; i < aggregators.length; ++i) {
+ forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i));
+ }
+ forward(forwardCache, outputObjInspector);
+ } else {
+
+ /* Iterate the global (keywrapper,aggregationbuffers) map and emit
+ a row for each key */
+ for(Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair:
+ mapKeysAggregationBuffers.entrySet()){
+ int fi = 0;
+ for (int i = 0; i < keyExpressions.length; ++i) {
+ VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey();
+ forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (kw, i);
+ }
+ for (int i = 0; i < aggregators.length; ++i) {
+ forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
+ .getAggregationBuffer(i));
+ }
+ LOG.debug(String.format("forwarding keys: %s: %s",
+ pair.getKey().toString(), Arrays.toString(forwardCache)));
+ forward(forwardCache, outputObjInspector);
+ }
}
- forward(forwardCache, outputObjInspector);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed May 22 20:58:08 2013
@@ -86,6 +86,7 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
/**
@@ -209,6 +210,9 @@ public class VectorizationContext {
public VectorExpression[] getVectorExpressions(List<ExprNodeDesc> exprNodes) throws HiveException {
int i = 0;
+ if (null == exprNodes) {
+ return new VectorExpression[0];
+ }
VectorExpression[] ret = new VectorExpression[exprNodes.size()];
for (ExprNodeDesc e : exprNodes) {
ret[i++] = getVectorExpression(e);
@@ -1058,5 +1062,18 @@ public class VectorizationContext {
return new LongColumnVector(defaultSize);
}
}
+
+ public ObjectInspector createObjectInspector(VectorExpression vectorExpression)
+ throws HiveException {
+ String columnType = vectorExpression.getOutputType();
+ if (columnType.equalsIgnoreCase("long") ||
+ columnType.equalsIgnoreCase("bigint")) {
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ } else if (columnType.equalsIgnoreCase("double")) {
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ } else {
+ throw new HiveException(String.format("Must implement type %s", columnType));
+ }
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java Wed May 22 20:58:08 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,6 +31,8 @@ public abstract class VectorAggregateExp
public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
public abstract void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
throws HiveException;
+ public abstract void aggregateInputSelection(VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex, VectorizedRowBatch vrg) throws HiveException;
public abstract void reset(AggregationBuffer agg) throws HiveException;
public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java Wed May 22 20:58:08 2013
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
- VectorAggregateExpression.AggregationBuffer;
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-/**
-* VectorUDAFAvgDouble. Vectorized implementation for AVG aggregates.
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: double)")
public class VectorUDAFAvgDouble extends VectorAggregateExpression {
- /**
- /* class for storing the current aggregate value.
- */
- static private final class Aggregation implements AggregationBuffer {
+ /** class for storing the current aggregate value. */
+ static class Aggregation implements AggregationBuffer {
double sum;
long count;
boolean isNull;
+
+ public void sumValue(double value) {
+ if (isNull) {
+ sum = value;
+ count = 1;
+ isNull = false;
+ } else {
+ sum += value;
+ count++;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public class VectorUDAFAvgDouble extends
super();
this.inputExpression = inputExpression;
partialResult = new Object[2];
- partialResult[0] = resultCount = new LongWritable();
- partialResult[1] = resultSum = new DoubleWritable();
+ resultCount = new LongWritable();
+ resultSum = new DoubleWritable();
+ partialResult[0] = resultCount;
+ partialResult[1] = resultSum;
initPartialResultInspector();
}
- private void initPartialResultInspector () {
- ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+ private void initPartialResultInspector() {
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
ArrayList<String> fname = new ArrayList<String>();
@@ -83,51 +94,238 @@ public class VectorUDAFAvgDouble extends
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
- throws HiveException {
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ VectorizedRowBatch batch) throws HiveException {
- inputExpression.evaluate(unit);
-
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
- cols[this.inputExpression.getOutputColumn()];
-
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
}
- Aggregation myagg = (Aggregation)agg;
-
- double[] vector = inputVector.vector;
+ inputExpression.evaluate(batch);
- if (inputVector.isRepeating) {
- if (inputVector.noNulls || !inputVector.isNull[0]) {
- if (myagg.isNull) {
- myagg.isNull = false;
- myagg.sum = 0;
- myagg.count = 0;
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, inputVector.isNull);
}
- myagg.sum += vector[0]*batchSize;
- myagg.count += batchSize;
}
- return;
}
-
- if (!unit.selectedInUse && inputVector.noNulls) {
- iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
}
- else if (!unit.selectedInUse) {
- iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
}
- else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
}
- else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ double[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += vector[0]*batchSize;
+ myagg.count += batchSize;
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+ }
}
private void iterateSelectionHasNulls(
@@ -236,7 +434,7 @@ public class VectorUDAFAvgDouble extends
@Override
public ObjectInspector getOutputObjectInspector() {
- return soi;
- }
+ return soi;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java Wed May 22 20:58:08 2013
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
- VectorAggregateExpression.AggregationBuffer;
+ VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-/**
-* VectorUDAFAvgLong. Vectorized implementation for AVG aggregates.
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
@Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: long)")
public class VectorUDAFAvgLong extends VectorAggregateExpression {
- /**
- /* class for storing the current aggregate value.
- */
- static private final class Aggregation implements AggregationBuffer {
+ /** class for storing the current aggregate value. */
+ static class Aggregation implements AggregationBuffer {
long sum;
long count;
boolean isNull;
+
+ public void sumValue(long value) {
+ if (isNull) {
+ sum = value;
+ count = 1;
+ isNull = false;
+ } else {
+ sum += value;
+ count++;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public class VectorUDAFAvgLong extends V
super();
this.inputExpression = inputExpression;
partialResult = new Object[2];
- partialResult[0] = resultCount = new LongWritable();
- partialResult[1] = resultSum = new DoubleWritable();
+ resultCount = new LongWritable();
+ resultSum = new DoubleWritable();
+ partialResult[0] = resultCount;
+ partialResult[1] = resultSum;
initPartialResultInspector();
}
- private void initPartialResultInspector () {
- ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+ private void initPartialResultInspector() {
+ ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
ArrayList<String> fname = new ArrayList<String>();
@@ -83,51 +94,238 @@ public class VectorUDAFAvgLong extends V
soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
}
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
- throws HiveException {
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ VectorizedRowBatch batch) throws HiveException {
- inputExpression.evaluate(unit);
-
- LongColumnVector inputVector = (LongColumnVector)unit.
- cols[this.inputExpression.getOutputColumn()];
-
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
}
- Aggregation myagg = (Aggregation)agg;
-
- long[] vector = inputVector.vector;
+ inputExpression.evaluate(batch);
- if (inputVector.isRepeating) {
- if (inputVector.noNulls || !inputVector.isNull[0]) {
- if (myagg.isNull) {
- myagg.isNull = false;
- myagg.sum = 0;
- myagg.count = 0;
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+ long[] vector = inputVector.vector;
+
+ if (inputVector.noNulls) {
+ if (inputVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize);
+ }
+ }
+ } else {
+ if (inputVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector[0], batchSize, inputVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, batch.selected, inputVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, bufferIndex,
+ vector, batchSize, inputVector.isNull);
}
- myagg.sum += vector[0]*batchSize;
- myagg.count += batchSize;
}
- return;
}
-
- if (!unit.selectedInUse && inputVector.noNulls) {
- iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
}
- else if (!unit.selectedInUse) {
- iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[selection[i]]);
}
- else if (inputVector.noNulls){
- iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
}
- else {
- iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
}
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long value,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(value);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ j);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int bufferIndex,
+ long[] values,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ bufferIndex,
+ i);
+ myagg.sumValue(values[i]);
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ long[] vector = inputVector.vector;
+
+ if (inputVector.isRepeating) {
+ if (inputVector.noNulls || !inputVector.isNull[0]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ myagg.sum = 0;
+ myagg.count = 0;
+ }
+ myagg.sum += vector[0]*batchSize;
+ myagg.count += batchSize;
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, vector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+ }
+ else if (inputVector.noNulls){
+ iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+ }
}
private void iterateSelectionHasNulls(
@@ -236,7 +434,7 @@ public class VectorUDAFAvgLong extends V
@Override
public ObjectInspector getOutputObjectInspector() {
- return soi;
- }
+ return soi;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@ public class VectorUDAFCountDouble exten
static class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public class VectorUDAFCountDouble exten
this.inputExpression = inputExpression;
result = new LongWritable(0);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+ DoubleColumnVector inputVector = (DoubleColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -80,11 +183,8 @@ public class VectorUDAFCountDouble exten
Aggregation myagg = (Aggregation)agg;
- if (myagg.isNull) {
- myagg.value = 0;
- myagg.isNull = false;
- }
-
+ myagg.initIfNull();
+
if (inputVector.isRepeating) {
if (inputVector.noNulls || !inputVector.isNull[0]) {
myagg.value += batchSize;
@@ -96,11 +196,11 @@ public class VectorUDAFCountDouble exten
myagg.value += batchSize;
return;
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
}
else {
- iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@ public class VectorUDAFCountLong extends
static class Aggregation implements AggregationBuffer {
long value;
boolean isNull;
+
+ public void initIfNull() {
+ if (isNull) {
+ isNull = false;
+ value = 0;
+ }
+ }
}
private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public class VectorUDAFCountLong extends
this.inputExpression = inputExpression;
result = new LongWritable(0);
}
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+ return myagg;
+ }
@Override
- public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ LongColumnVector inputVector = (LongColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputVector.noNulls) {
+ // if there are no nulls then the iteration is the same on all cases
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex, batchSize);
+ } else if (!batch.selectedInUse) {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, inputVector.isNull);
+ } else if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregateIndex,
+ batchSize, batch.selected, inputVector.isNull);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ i);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregateIndex,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregateIndex,
+ j);
+ myagg.initIfNull();
+ myagg.value++;
+ }
+ }
+ }
+
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
throws HiveException {
- inputExpression.evaluate(unit);
+ inputExpression.evaluate(batch);
- LongColumnVector inputVector = (LongColumnVector)unit.
+ LongColumnVector inputVector = (LongColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- int batchSize = unit.size;
+ int batchSize = batch.size;
if (batchSize == 0) {
return;
@@ -80,11 +183,8 @@ public class VectorUDAFCountLong extends
Aggregation myagg = (Aggregation)agg;
- if (myagg.isNull) {
- myagg.value = 0;
- myagg.isNull = false;
- }
-
+ myagg.initIfNull();
+
if (inputVector.isRepeating) {
if (inputVector.noNulls || !inputVector.isNull[0]) {
myagg.value += batchSize;
@@ -96,11 +196,11 @@ public class VectorUDAFCountLong extends
myagg.value += batchSize;
return;
}
- else if (!unit.selectedInUse) {
+ else if (!batch.selectedInUse) {
iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
}
else {
- iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+ iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
}
}