You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/05/22 22:58:09 UTC

svn commit: r1485419 [1/4] - in /hive/branches/vectorization: metastore/src/java/org/apache/hadoop/hive/metastore/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/java/org/apache/hadoop/hive/ql/exec...

Author: omalley
Date: Wed May 22 20:58:08 2013
New Revision: 1485419

URL: http://svn.apache.org/r1485419
Log:
HIVE-4450 Extend Vector Aggregates to support GROUP BY (Remus Rusanu via 
omalley)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFAvg.txt
Modified:
    hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdPopLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFStdSampLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFSumLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarPopLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFVarSampLong.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMax.txt
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFSum.txt
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFVar.txt
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java

Modified: hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/vectorization/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Wed May 22 20:58:08 2013
@@ -51,7 +51,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.serde.serdeConstants;;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapper.java Wed May 22 20:58:08 2013
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+ * A hash map key wrapper for vectorized processing.
+ * It stores the key values as primitives in arrays for each supported primitive type.
+ * This works in conjunction with 
+ * {@link org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch VectorHashKeyWrapperBatch}
+ * to hash vectorized processing units (batches). 
+ */
+public class VectorHashKeyWrapper extends KeyWrapper {
+  
+  private long[] longValues;
+  private double[] doubleValues;
+  private boolean[] isNull;
+  private int hashcode;
+  
+  public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount) {
+    longValues = new long[longValuesCount];
+    doubleValues = new double[doubleValuesCount];
+    isNull = new boolean[longValuesCount + doubleValuesCount];
+  }
+  
+  private VectorHashKeyWrapper() {
+  }
+
+  @Override
+  void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException {
+    throw new HiveException("Should not be called");
+  }
+
+  @Override
+  void setHashKey() {
+    hashcode = Arrays.hashCode(longValues) ^
+        Arrays.hashCode(doubleValues) ^
+        Arrays.hashCode(isNull);
+  }
+  
+  @Override
+  public int hashCode() {
+    return hashcode;
+  }
+  
+  @Override 
+  public boolean equals(Object that) {
+    if (that instanceof VectorHashKeyWrapper) {
+      VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that;
+      return hashcode == keyThat.hashcode &&
+          Arrays.equals(longValues, keyThat.longValues) &&
+          Arrays.equals(doubleValues, keyThat.doubleValues) &&
+          Arrays.equals(isNull, keyThat.isNull);
+    }
+    return false;
+  }
+  
+  @Override
+  protected Object clone() {
+    VectorHashKeyWrapper clone = new VectorHashKeyWrapper();
+    clone.longValues = longValues.clone();
+    clone.doubleValues = doubleValues.clone();
+    clone.isNull = isNull.clone();
+    clone.hashcode = hashcode;
+    return clone;
+  }
+
+  @Override
+  public KeyWrapper copyKey() {
+    return (KeyWrapper) clone();
+  }
+
+  @Override
+  void copyKey(KeyWrapper oldWrapper) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  Object[] getKeyArray() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public void assignDouble(int index, double d) {
+    doubleValues[index] = d;
+    isNull[longValues.length + index] = false;
+  }
+
+  public void assignNullDouble(int index) {
+    doubleValues[index] = 0; // assign 0 to simplify hashcode
+    isNull[longValues.length + index] = true;
+  }
+
+  public void assignLong(int index, long v) {
+    longValues[index] = v;
+    isNull[index] = false;
+  }
+
+  public void assignNullLong(int index) {
+    longValues[index] = 0; // assign 0 to simplify hashcode
+    isNull[index] = true;
+  }
+  
+  @Override
+  public String toString() 
+  {
+    return String.format("%d[%s] %d[%s]", 
+        longValues.length, Arrays.toString(longValues),
+        doubleValues.length, Arrays.toString(doubleValues));
+  }
+
+  public boolean getIsNull(int i) {
+    return isNull[i];
+  }
+  
+  public long getLongValue(int i) {
+    return longValues[i];
+  }
+
+  public double getDoubleValue(int i) {
+    return doubleValues[i - longValues.length];
+  }
+
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/VectorHashKeyWrapperBatch.java Wed May 22 20:58:08 2013
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Class for handling vectorized hash map key wrappers. It evaluates the key columns in a 
+ * row batch in a vectorized fashion.
+ * This class stores additional information about keys needed to evaluate and output the key values.
+ *
+ */
+public class VectorHashKeyWrapperBatch {
+  
+  /**
+   * Helper class for looking up a key value based on key index
+   *
+   */
+  private static class KeyLookupHelper {
+    public int longIndex;
+    public int doubleIndex;
+  }
+  
+  /**
+   * The key expressions that require evaluation and output the primitive values for each key.
+   */
+  private VectorExpression[] keyExpressions;
+  
+  /**
+   * indices of LONG primitive keys
+   */
+  private int[] longIndices;
+  
+  /**
+   * indices of DOUBLE primitive keys
+   */
+  private int[] doubleIndices;
+  
+  /**
+   * pre-allocated batch size vector of keys wrappers. 
+   * N.B. these keys are **mutable** and should never be used in a HashMap.
+   * Always clone the key wrapper to obtain an immutable keywrapper suitable 
+   * to use a key in a HashMap.
+   */
+  private VectorHashKeyWrapper[] vectorHashKeyWrappers;
+  
+  /**
+   * lookup vector to map from key index to primitive type index
+   */
+  private KeyLookupHelper[] indexLookup;
+  
+  /**
+   * preallocated and reused LongWritable objects for emiting row mode key values 
+   */
+  private LongWritable[] longKeyValueOutput;
+  
+  /**
+   * preallocated and reused DoubleWritable objects for emiting row mode key values
+   */
+  private DoubleWritable[] doubleKeyValueOutput;
+  
+  /**
+   * Accessor for the batch-sized array of key wrappers 
+   */
+  public VectorHashKeyWrapper[] getVectorHashKeyWrappers() {
+    return vectorHashKeyWrappers;
+  }
+  
+  /**
+   * Processes a batch:
+   * <ul>
+   * <li>Evaluates each key vector expression.</li>
+   * <li>Copies out each key's primitive values into the key wrappers</li>
+   * <li>computes the hashcode of the key wrappers</li>
+   * </ul>
+   * @param vrb
+   * @throws HiveException
+   */
+  public void evaluateBatch (VectorizedRowBatch vrb) throws HiveException {
+    for(int i = 0; i < keyExpressions.length; ++i) {
+      keyExpressions[i].evaluate(vrb);
+    }
+    for(int i = 0; i< longIndices.length; ++i) {
+      int keyIndex = longIndices[i];
+      int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      LongColumnVector columnVector = (LongColumnVector) vrb.cols[columnIndex];
+      if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+        assignLongNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+      } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+        assignLongNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected);
+      } else if (columnVector.noNulls && columnVector.isRepeating) {
+        assignLongNoNullsRepeating(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+        assignLongNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && columnVector.isRepeating) {
+        assignLongNullsRepeating(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+        assignLongNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected);
+      } else {
+        throw new HiveException (String.format("Unimplemented Long null/repeat/selected combination %b/%b/%b",
+            columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse));
+      }
+    }
+    for(int i=0;i<doubleIndices.length; ++i) {
+      int keyIndex = doubleIndices[i];
+      int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+      DoubleColumnVector columnVector = (DoubleColumnVector) vrb.cols[columnIndex];
+      if (columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+        assignDoubleNoNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+      } else if (columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+        assignDoubleNoNullsNoRepeatingSelection(i, vrb.size, columnVector, vrb.selected);
+      } else if (columnVector.noNulls && columnVector.isRepeating) {
+        assignDoubleNoNullsRepeating(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && !columnVector.isRepeating && !vrb.selectedInUse) {
+        assignDoubleNullsNoRepeatingNoSelection(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && columnVector.isRepeating) {
+        assignDoubleNullsRepeating(i, vrb.size, columnVector);
+      } else if (!columnVector.noNulls && !columnVector.isRepeating && vrb.selectedInUse) {
+        assignDoubleNullsNoRepeatingSelection (i, vrb.size, columnVector, vrb.selected);
+      } else {
+        throw new HiveException (String.format("Unimplemented Double null/repeat/selected combination %b/%b/%b",
+            columnVector.noNulls, columnVector.isRepeating, vrb.selectedInUse));
+      }
+    }
+    for(int i=0;i<vrb.size;++i) {
+      vectorHashKeyWrappers[i].setHashKey();
+    }
+  }
+  
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, possible nulls, no repeat values, batch selection vector.
+   */
+  private void assignDoubleNullsNoRepeatingSelection(int index, int size,
+      DoubleColumnVector columnVector, int[] selected) {
+    for(int r = 0; r < size; ++r) {
+      if (!columnVector.isNull[r]) {
+        vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[selected[r]]);
+      } else {
+        vectorHashKeyWrappers[r].assignNullDouble(index);
+      }
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for Double type, repeat null values.
+   */
+  private void assignDoubleNullsRepeating(int index, int size,
+      DoubleColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignNullDouble(index);
+    }    
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for Double type, possible nulls, repeat values.
+   */
+  private void assignDoubleNullsNoRepeatingNoSelection(int index, int size,
+      DoubleColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      if (!columnVector.isNull[r]) {
+        vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[r]);
+      } else {
+        vectorHashKeyWrappers[r].assignNullDouble(index);
+      }
+    }    
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, repeat values, no selection vector.
+   */
+  private void assignDoubleNoNullsRepeating(int index, int size, DoubleColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[0]);
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, no repeat values, batch selection vector.
+   */
+  private void assignDoubleNoNullsNoRepeatingSelection(int index, int size,
+      DoubleColumnVector columnVector, int[] selected) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[selected[r]]);
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, no repeat values, no selection vector.
+   */
+  private void assignDoubleNoNullsNoRepeatingNoSelection(int index, int size,
+      DoubleColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignDouble(index, columnVector.vector[r]);
+    }
+  }
+  
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, possible nulls, no repeat values, batch selection vector.
+   */
+  private void assignLongNullsNoRepeatingSelection(int index, int size,
+      LongColumnVector columnVector, int[] selected) {
+    for(int r = 0; r < size; ++r) {
+      if (!columnVector.isNull[selected[r]]) {
+        vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[selected[r]]);
+      } else {
+        vectorHashKeyWrappers[r].assignNullLong(index);
+      }
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, repeating nulls.
+   */
+  private void assignLongNullsRepeating(int index, int size,
+      LongColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignNullLong(index);
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, possible nulls, no repeat values, no selection vector.
+   */
+  private void assignLongNullsNoRepeatingNoSelection(int index, int size,
+      LongColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      if (!columnVector.isNull[r]) {
+        vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[r]);
+      } else {
+        vectorHashKeyWrappers[r].assignNullLong(index);
+      }
+    }    
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, repeat values, no selection vector.
+   */
+  private void assignLongNoNullsRepeating(int index, int size, LongColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[0]);
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, no repeat values, batch selection vector.
+   */
+  private void assignLongNoNullsNoRepeatingSelection(int index, int size,
+      LongColumnVector columnVector, int[] selected) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[selected[r]]);
+    }
+  }
+
+  /**
+   * Helper method to assign values from a vector column into the key wrapper.
+   * Optimized for double type, no nulls, no repeat values, no selection vector.
+   */
+  private void assignLongNoNullsNoRepeatingNoSelection(int index, int size,
+      LongColumnVector columnVector) {
+    for(int r = 0; r < size; ++r) {
+      vectorHashKeyWrappers[r].assignLong(index, columnVector.vector[r]);
+    }
+  }
+
+  /**
+   * Prepares a VectorHashKeyWrapperBatch to work for a specific set of keys.
+   * Computes the fast access lookup indices, preallocates all needed internal arrays.
+   * This step is done only once per query, not once per batch. The information computed now
+   * will be used to generate proper individual VectorKeyHashWrapper objects.
+   */
+  public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions)
+    throws HiveException {
+    VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch();
+    compiledKeyWrapperBatch.keyExpressions = keyExpressions;
+    
+    // We'll overallocate and then shrink the array for each type
+    int[] longIndices = new int[keyExpressions.length];
+    int longIndicesIndex = 0;
+    int[] doubleIndices = new int[keyExpressions.length];
+    int doubleIndicesIndex  = 0;
+    KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length];
+    
+    // Inspect the output type of each key expression.
+    for(int i=0; i < keyExpressions.length; ++i) {
+      indexLookup[i] = new KeyLookupHelper();
+      String outputType = keyExpressions[i].getOutputType();
+      if (outputType.equalsIgnoreCase("long") || 
+          outputType.equalsIgnoreCase("bigint")) {
+        longIndices[longIndicesIndex] = i;
+        indexLookup[i].longIndex = longIndicesIndex;
+        indexLookup[i].doubleIndex = -1;
+        ++longIndicesIndex;
+      } else if (outputType.equalsIgnoreCase("double")) {
+        doubleIndices[doubleIndicesIndex] = i;
+        indexLookup[i].longIndex = -1;
+        indexLookup[i].doubleIndex = doubleIndicesIndex;
+        ++doubleIndicesIndex;
+      } else {
+        throw new HiveException("Unsuported vector output type: " + outputType);
+      }
+    }
+    compiledKeyWrapperBatch.indexLookup = indexLookup;
+    compiledKeyWrapperBatch.longKeyValueOutput = new LongWritable[longIndicesIndex];
+    for (int i=0; i < longIndicesIndex; ++i) {
+      compiledKeyWrapperBatch.longKeyValueOutput[i] = new LongWritable();
+    }
+    compiledKeyWrapperBatch.doubleKeyValueOutput = new DoubleWritable[doubleIndicesIndex];
+    for (int i=0; i < doubleIndicesIndex; ++i) {
+      compiledKeyWrapperBatch.doubleKeyValueOutput[i] = new DoubleWritable();
+    }
+    compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
+    compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
+    compiledKeyWrapperBatch.vectorHashKeyWrappers = 
+        new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
+    for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
+      compiledKeyWrapperBatch.vectorHashKeyWrappers[i] = 
+          new VectorHashKeyWrapper(longIndicesIndex, doubleIndicesIndex);
+    }
+    return compiledKeyWrapperBatch;
+  }
+
+  /**
+   * Get the row-mode writable object value of a key from a key wrapper
+   */
+  public Object getWritableKeyValue(VectorHashKeyWrapper kw, int i) 
+    throws HiveException {
+    if (kw.getIsNull(i)) {
+      return null;
+    }
+    KeyLookupHelper klh = indexLookup[i];
+    if (klh.longIndex >= 0) {
+      longKeyValueOutput[klh.longIndex].set(kw.getLongValue(i));
+      return longKeyValueOutput[klh.longIndex];
+    } else if (klh.doubleIndex >= 0) {
+      doubleKeyValueOutput[klh.doubleIndex].set(kw.getDoubleValue(i));
+      return doubleKeyValueOutput[klh.doubleIndex];
+    } else {
+      throw new HiveException(String.format(
+          "Internal inconsistent KeyLookupHelper at index [%d]:%d %d",
+          i, klh.longIndex, klh.doubleIndex));
+    }
+  }  
+}
+

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java Wed May 22 20:58:08 2013
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+/**
+ * This maps a batch to the aggregation buffers sets to use for each row (key)  
+ *
+ */
+public class VectorAggregationBufferBatch {
+  
+  /**
+   * Batch sized array of aggregation buffer sets. 
+   * The array is preallocated and is reused for each batch, but the individual entries
+   * will reference different aggregation buffer set from batch to batch.
+   * the array is not reset between batches, content past this.index will be stale.
+   */
+  private VectorAggregationBufferRow[] aggregationBuffers;
+  
+  /**
+   * the selection vector that maps row within a batch to the 
+   * specific aggregation buffer set to use. 
+   */
+  private int[] selection;
+  
+  /**
+   * versioning number gets incremented on each batch. This allows us to cache the selection
+   * mapping info in the aggregation buffer set themselves while still being able to 
+   * detect stale info.
+   */
+  private int version;
+  
+  /**
+   * Get the number of distinct aggregation buffer sets (ie. keys) used in current batch.
+   */
+  private int distinctCount;
+  
+  /**
+   * the array of aggregation buffers for the current batch.
+   * content past the {@link #getDistinctBufferSetCount()} index
+   * is stale from previous batches.
+   * @return
+   */
+  public VectorAggregationBufferRow[] getAggregationBuffers() {
+    return aggregationBuffers;
+  }
+  
+  /**
+   * number of distinct aggregation buffer sets (ie. keys) in the current batch. 
+   * @return
+   */
+  public int getDistinctBufferSetCount () {
+    return distinctCount;
+  }
+
+  /**
+   * gets the selection vector to use for the current batch. This maps the batch rows by position 
+   * (row number) to an index in the {@link #getAggregationBuffers()} array.
+   * @return
+   */
+  public int[] getSelectionVector() {
+    return selection;
+  }
+  
+  public VectorAggregationBufferBatch() {
+    aggregationBuffers = new VectorAggregationBufferRow[VectorizedRowBatch.DEFAULT_SIZE];
+    selection = new int [VectorizedRowBatch.DEFAULT_SIZE];
+  }
+  
+  /**
+   * resets the internal aggregation buffers sets index and increments the versioning 
+   * used to optimize the selection vector population.
+   */
+  public void startBatch() {
+    version++;
+    distinctCount = 0;
+  }
+    
+  /**
+   * assigns the given aggregation buffer set to a given batch row (by row number).
+   * populates the selection vector appropriately. This is where the versioning numbers
+   * play a role in determining if the index cached on the aggregation buffer set is stale. 
+   */
+  public void mapAggregationBufferSet(VectorAggregationBufferRow bufferSet, int row) {
+    if (version != bufferSet.getVersion()) {
+      bufferSet.setVersionAndIndex(version, distinctCount);
+      ++distinctCount;
+    }
+    aggregationBuffers[row] = bufferSet;
+  }
+
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java?rev=1485419&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java Wed May 22 20:58:08 2013
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+
+/**
+ * Represents a set of aggregation buffers to be used for a specific key for UDAF GROUP BY.
+ *
+ */
+public class VectorAggregationBufferRow {
+  private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers;
+  private int version;
+  private int index;
+  
+  public VectorAggregationBufferRow(
+      VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) {
+    this.aggregationBuffers = aggregationBuffers;
+  }
+  
+  /**
+   * returns the aggregation buffer for an aggregation expression, by index.
+   */
+  public VectorAggregateExpression.AggregationBuffer getAggregationBuffer(int bufferIndex) {
+    return aggregationBuffers[bufferIndex];
+  }
+
+  /**
+   * returns the array of aggregation buffers (the entire set).
+   */
+  public VectorAggregateExpression.AggregationBuffer[] getAggregationBuffers() {
+    return aggregationBuffers;
+  }
+
+  /** 
+   * Versioning used to detect staleness of the index cached for benefit of
+   * {@link org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferBatch VectorAggregationBufferBatch}.
+   */
+  public int getVersion() {
+    return version;
+  }
+  
+  /**
+   * cached index used by VectorAggregationBufferBatch.
+   * @return
+   */
+  public int getIndex() {
+    return index;
+  }
+
+  /**
+   * accessor for VectorAggregationBufferBatch to set its caching info on this set.
+   */
+  public void setVersionAndIndex(int version, int index) {
+    this.index  = index;
+    this.version = version;
+  }
+  
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed May 22 20:58:08 2013
@@ -20,14 +20,20 @@ package org.apache.hadoop.hive.ql.exec.v
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapper;
+import org.apache.hadoop.hive.ql.exec.VectorHashKeyWrapperBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates
-    .VectorAggregateExpression.AggregationBuffer;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
@@ -37,19 +43,44 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
 /**
- * Vectorized GROUP BY operator impelementation. Consumes the vectorized input and
- * stores the aggregates operators intermediate states. Emits row mode output.
+ * Vectorized GROUP BY operator implementation. Consumes the vectorized input and
+ * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
 public class VectorGroupByOperator extends Operator<GroupByDesc> implements Serializable {
 
+  private static final Log LOG = LogFactory.getLog(
+      VectorGroupByOperator.class.getName());
+
   private final VectorizationContext vContext;
 
-  protected transient VectorAggregateExpression[] aggregators;
-  protected transient AggregationBuffer[] aggregationBuffers;
+  /**
+   * This is the vector of aggregators. They are stateless and only implement
+   * the algorithm of how to compute the aggregation. state is kept in the 
+   * aggregation buffers and is our responsibility to match the proper state for each key. 
+   */
+  private transient VectorAggregateExpression[] aggregators;
+  
+  /**
+   * Key vector expressions.
+   */
+  private transient VectorExpression[] keyExpressions;
+  
+  /**
+   * The aggregation buffers to use for the current batch.
+   */
+  private transient VectorAggregationBufferBatch aggregationBatchInfo;
+
+  /**
+   * The current batch key wrappers.
+   * The very same instance gets reused for all batches.
+   */
+  private transient VectorHashKeyWrapperBatch keyWrappersBatch;
 
-  transient int heartbeatInterval;
-  transient int countAfterReport;
+  /**
+   * The global key-aggregation hash map.
+   */
+  private transient Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
 
   private static final long serialVersionUID = 1L;
 
@@ -68,17 +99,22 @@ public class VectorGroupByOperator exten
       vContext.setOperatorType(OperatorType.GROUPBY);
 
       ArrayList<AggregationDesc> aggrDesc = conf.getAggregators();
+      keyExpressions = vContext.getVectorExpressions(conf.getKeys());
+      
+      for(int i = 0; i < keyExpressions.length; ++i) {
+        objectInspectors.add(vContext.createObjectInspector(keyExpressions[i]));
+      }
 
       aggregators = new VectorAggregateExpression[aggrDesc.size()];
-      aggregationBuffers = new AggregationBuffer[aggrDesc.size()];
       for (int i = 0; i < aggrDesc.size(); ++i) {
         AggregationDesc desc = aggrDesc.get(i);
         aggregators[i] = vContext.getAggregatorExpression (desc);
-        aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
-        aggregators[i].reset(aggregationBuffers[i]);
-
         objectInspectors.add(aggregators[i].getOutputObjectInspector());
       }
+      
+      keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+      aggregationBatchInfo = new VectorAggregationBufferBatch();
+      mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
 
       List<String> outputFieldNames = conf.getOutputColumnNames();
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
@@ -94,22 +130,120 @@ public class VectorGroupByOperator exten
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    VectorizedRowBatch vrg = (VectorizedRowBatch) row;
+    VectorizedRowBatch batch = (VectorizedRowBatch) row;
+    
+    // First we traverse the batch to evaluate and prepare the KeyWrappers
+    // After this the KeyWrappers are properly set and hash code is computed
+    keyWrappersBatch.evaluateBatch(batch);
+
+    // Next we locate the aggregation buffer set for each key
+    prepareBatchAggregationBufferSets(batch);
+    
+    // Finally, evaluate the aggregators
+    processAggregators(batch);
+  }
+   
+  /**
+   * Evaluates the aggregators on the current batch.
+   * The aggregationBatchInfo must have been prepared
+   * by calling {@link #prepareBatchAggregationBufferSets} first. 
+   */
+  private void processAggregators(VectorizedRowBatch batch) throws HiveException {
+    // We now have a vector of aggregation buffer sets to use for each row
+    // We can start computing the aggregates.
+    // If the number of distinct keys in the batch is 1 we can
+    // use the optimized code path of aggregateInput
+    VectorAggregationBufferRow[] aggregationBufferSets = 
+        aggregationBatchInfo.getAggregationBuffers();
+    if (aggregationBatchInfo.getDistinctBufferSetCount() == 1) {
+      VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = 
+          aggregationBufferSets[0].getAggregationBuffers();
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].aggregateInput(aggregationBuffers[i], batch);
+      }
+    } else {
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].aggregateInputSelection(
+            aggregationBufferSets,
+            i,
+            batch);
+      }
+    }
+  }
+  
+  /**
+   * Locates the aggregation buffer sets to use for each key in the current batch.
+   * The keyWrappersBatch must have evaluated the current batch first.
+   */
+  private void prepareBatchAggregationBufferSets(VectorizedRowBatch batch) throws HiveException {
+    // The aggregation batch vector needs to know when we start a new batch
+    // to bump its internal version. 
+    aggregationBatchInfo.startBatch();
+    
+    // We now have to probe the global hash and find-or-allocate
+    // the aggregation buffers to use for each key present in the batch
+    VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
+    for (int i=0; i < batch.size; ++i) {
+      VectorHashKeyWrapper kw = keyWrappers[i];
+      VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
+      if (null == aggregationBuffer) {
+        // the probe failed, we must allocate a set of aggregation buffers 
+        // and push the (keywrapper,buffers) pair into the hash.
+        // is very important to clone the keywrapper, the one we have from our
+        // keyWrappersBatch is going to be reset/reused on next batch.
+        aggregationBuffer = allocateAggregationBuffer();
+        mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
+      }
+      aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
+    }
+  }
 
-    //TODO: proper group by hash
-    for (int i = 0; i < aggregators.length; ++i) {
-      aggregators[i].aggregateInput(aggregationBuffers[i], vrg);
+  /**
+   * allocates a new aggregation buffer set.
+   */
+  private VectorAggregationBufferRow allocateAggregationBuffer() throws HiveException {
+    VectorAggregateExpression.AggregationBuffer[] aggregationBuffers = 
+        new VectorAggregateExpression.AggregationBuffer[aggregators.length];
+    for (int i=0; i < aggregators.length; ++i) {
+      aggregationBuffers[i] = aggregators[i].getNewAggregationBuffer();
+      aggregators[i].reset(aggregationBuffers[i]);
     }
+    VectorAggregationBufferRow bufferSet = new VectorAggregationBufferRow(aggregationBuffers);
+    return bufferSet;
   }
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
     if (!aborted) {
-      Object[] forwardCache = new Object[aggregators.length];
-      for (int i = 0; i < aggregators.length; ++i) {
-        forwardCache[i] = aggregators[i].evaluateOutput(aggregationBuffers[i]);
+      Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
+      if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
+        
+        // if this is a global aggregation (no keys) and empty set, must still emit NULLs
+        VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
+        for (int i = 0; i < aggregators.length; ++i) {
+          forwardCache[i] = aggregators[i].evaluateOutput(emptyBuffers.getAggregationBuffer(i));
+        }
+        forward(forwardCache, outputObjInspector);
+      } else {
+        
+        /* Iterate the global (keywrapper,aggregationbuffers) map and emit
+         a row for each key */
+        for(Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair: 
+          mapKeysAggregationBuffers.entrySet()){
+          int fi = 0;
+          for (int i = 0; i < keyExpressions.length; ++i) {
+            VectorHashKeyWrapper kw = (VectorHashKeyWrapper)pair.getKey();
+            forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (kw, i);
+          }
+          for (int i = 0; i < aggregators.length; ++i) {
+            forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
+                .getAggregationBuffer(i));
+          }
+          LOG.debug(String.format("forwarding keys: %s: %s", 
+              pair.getKey().toString(), Arrays.toString(forwardCache)));
+          forward(forwardCache, outputObjInspector);
+        }
       }
-      forward(forwardCache, outputObjInspector);
     }
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed May 22 20:58:08 2013
@@ -86,6 +86,7 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 
 /**
@@ -209,6 +210,9 @@ public class VectorizationContext {
 
   public VectorExpression[] getVectorExpressions(List<ExprNodeDesc> exprNodes) throws HiveException {
     int i = 0;
+    if (null == exprNodes) {
+      return new VectorExpression[0];
+    }
     VectorExpression[] ret = new VectorExpression[exprNodes.size()];
     for (ExprNodeDesc e : exprNodes) {
       ret[i++] = getVectorExpression(e);
@@ -1058,5 +1062,18 @@ public class VectorizationContext {
       return new LongColumnVector(defaultSize);
     }
   }
+
+  public ObjectInspector createObjectInspector(VectorExpression vectorExpression) 
+      throws HiveException {
+    String columnType = vectorExpression.getOutputType();
+    if (columnType.equalsIgnoreCase("long") ||
+        columnType.equalsIgnoreCase("bigint")) {
+      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+    } else if (columnType.equalsIgnoreCase("double")) {
+      return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+    } else {
+      throw new HiveException(String.format("Must implement type %s", columnType));
+    }
+  }
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java Wed May 22 20:58:08 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -30,6 +31,8 @@ public abstract class VectorAggregateExp
   public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
   public abstract void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit)
         throws HiveException;
+  public abstract void aggregateInputSelection(VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregateIndex, VectorizedRowBatch vrg) throws HiveException;
   public abstract void reset(AggregationBuffer agg) throws HiveException;
   public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgDouble.java Wed May 22 20:58:08 2013
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
-    VectorAggregateExpression.AggregationBuffer;
+  VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 
-/**
-* VectorUDAFAvgDouble. Vectorized implementation for AVG aggregates. 
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
 @Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: double)")
 public class VectorUDAFAvgDouble extends VectorAggregateExpression {
     
-    /** 
-    /* class for storing the current aggregate value.
-    */
-    static private final class Aggregation implements AggregationBuffer {
+    /** class for storing the current aggregate value. */
+    static class Aggregation implements AggregationBuffer {
       double sum;
       long count;
       boolean isNull;
+      
+      public void sumValue(double value) {
+        if (isNull) {
+          sum = value; 
+          count = 1;
+          isNull = false;
+        } else {
+          sum += value;
+          count++;
+        }
+      }
     }
     
     private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public class VectorUDAFAvgDouble extends
       super();
       this.inputExpression = inputExpression;
       partialResult = new Object[2];
-      partialResult[0] = resultCount = new LongWritable();
-      partialResult[1] = resultSum = new DoubleWritable();
+      resultCount = new LongWritable();
+      resultSum = new DoubleWritable();
+      partialResult[0] = resultCount;
+      partialResult[1] = resultSum;
       
       initPartialResultInspector();
     }
 
-  private void initPartialResultInspector () {
-      ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+    private void initPartialResultInspector() {
+        ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
         foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
         foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
         ArrayList<String> fname = new ArrayList<String>();
@@ -83,51 +94,238 @@ public class VectorUDAFAvgDouble extends
         soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
     }
     
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int bufferIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+      return myagg;
+    }
     
     @Override
-    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) 
-    throws HiveException {
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex, 
+      VectorizedRowBatch batch) throws HiveException {
       
-      inputExpression.evaluate(unit);
-      
-      DoubleColumnVector inputVector = (DoubleColumnVector)unit.
-      cols[this.inputExpression.getOutputColumn()];
-      
-      int batchSize = unit.size;
+      int batchSize = batch.size;
       
       if (batchSize == 0) {
         return;
       }
       
-      Aggregation myagg = (Aggregation)agg;
-
-      double[] vector = inputVector.vector;
+      inputExpression.evaluate(batch);
       
-      if (inputVector.isRepeating) {
-        if (inputVector.noNulls || !inputVector.isNull[0]) {
-          if (myagg.isNull) {
-            myagg.isNull = false;
-            myagg.sum = 0;
-            myagg.count = 0;
+      LongColumnVector inputVector = (LongColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+      long[] vector = inputVector.vector;
+
+      if (inputVector.noNulls) {
+        if (inputVector.isRepeating) {
+          iterateNoNullsRepeatingWithAggregationSelection(
+            aggregationBufferSets, bufferIndex,
+            vector[0], batchSize);
+        } else {
+          if (batch.selectedInUse) {
+            iterateNoNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batch.selected, batchSize);
+          } else {
+            iterateNoNullsWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize);
+          }
+        }
+      } else {
+        if (inputVector.isRepeating) {
+          if (batch.selectedInUse) {
+            iterateHasNullsRepeatingSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector[0], batchSize, batch.selected, inputVector.isNull);
+          } else {
+            iterateHasNullsRepeatingWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector[0], batchSize, inputVector.isNull);
+          }
+        } else {
+          if (batch.selectedInUse) {
+            iterateHasNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize, batch.selected, inputVector.isNull);
+          } else {
+            iterateHasNullsWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize, inputVector.isNull);
           }
-          myagg.sum += vector[0]*batchSize;
-          myagg.count += batchSize;
         }
-        return;
       }
-      
-      if (!unit.selectedInUse && inputVector.noNulls) {
-        iterateNoSelectionNoNulls(myagg, vector, batchSize);
+    }
+
+    private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize) {
+
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(value);
       }
-      else if (!unit.selectedInUse) {
-        iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+    } 
+
+    private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int[] selection,
+      int batchSize) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(values[selection[i]]);
       }
-      else if (inputVector.noNulls){
-        iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize) {
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(values[i]);
       }
-      else {
-        iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+    }
+
+    private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize,
+      int[] selection,
+      boolean[] isNull) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[selection[i]]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(value);
+        }
       }
+      
+    }
+
+    private void iterateHasNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize,
+      boolean[] isNull) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(value);
+        }
+      }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize,
+      int[] selection,
+      boolean[] isNull) {
+
+      for (int j=0; j < batchSize; ++j) {
+        int i = selection[j];
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            j);
+          myagg.sumValue(values[i]);
+        }
+      }
+   }
+
+    private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize,
+      boolean[] isNull) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(values[i]);
+        }
+      }
+   }
+
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+        
+        inputExpression.evaluate(batch);
+        
+        DoubleColumnVector inputVector = (DoubleColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+        
+        int batchSize = batch.size;
+        
+        if (batchSize == 0) {
+          return;
+        }
+        
+        Aggregation myagg = (Aggregation)agg;
+  
+        double[] vector = inputVector.vector;
+        
+        if (inputVector.isRepeating) {
+          if (inputVector.noNulls || !inputVector.isNull[0]) {
+            if (myagg.isNull) {
+              myagg.isNull = false;
+              myagg.sum = 0;
+              myagg.count = 0;
+            }
+            myagg.sum += vector[0]*batchSize;
+            myagg.count += batchSize;
+          }
+          return;
+        }
+        
+        if (!batch.selectedInUse && inputVector.noNulls) {
+          iterateNoSelectionNoNulls(myagg, vector, batchSize);
+        }
+        else if (!batch.selectedInUse) {
+          iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+        }
+        else if (inputVector.noNulls){
+          iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+        }
+        else {
+          iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+        }
     }
   
     private void iterateSelectionHasNulls(
@@ -236,7 +434,7 @@ public class VectorUDAFAvgDouble extends
     
   @Override
     public ObjectInspector getOutputObjectInspector() {
-      return soi;
-    }
+    return soi;
+  }     
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFAvgLong.java Wed May 22 20:58:08 2013
@@ -24,7 +24,8 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
-    VectorAggregateExpression.AggregationBuffer;
+  VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -42,19 +43,27 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 
-/**
-* VectorUDAFAvgLong. Vectorized implementation for AVG aggregates. 
-*/
+import org.apache.hadoop.hive.ql.io.orc.*;
+
 @Description(name = "avg", value = "_FUNC_(expr) - Returns the average value of expr (vectorized, type: long)")
 public class VectorUDAFAvgLong extends VectorAggregateExpression {
     
-    /** 
-    /* class for storing the current aggregate value.
-    */
-    static private final class Aggregation implements AggregationBuffer {
+    /** class for storing the current aggregate value. */
+    static class Aggregation implements AggregationBuffer {
       long sum;
       long count;
       boolean isNull;
+      
+      public void sumValue(long value) {
+        if (isNull) {
+          sum = value; 
+          count = 1;
+          isNull = false;
+        } else {
+          sum += value;
+          count++;
+        }
+      }
     }
     
     private VectorExpression inputExpression;
@@ -67,14 +76,16 @@ public class VectorUDAFAvgLong extends V
       super();
       this.inputExpression = inputExpression;
       partialResult = new Object[2];
-      partialResult[0] = resultCount = new LongWritable();
-      partialResult[1] = resultSum = new DoubleWritable();
+      resultCount = new LongWritable();
+      resultSum = new DoubleWritable();
+      partialResult[0] = resultCount;
+      partialResult[1] = resultSum;
       
       initPartialResultInspector();
     }
 
-  private void initPartialResultInspector () {
-      ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
+    private void initPartialResultInspector() {
+        ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
         foi.add(PrimitiveObjectInspectorFactory.writableLongObjectInspector);
         foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
         ArrayList<String> fname = new ArrayList<String>();
@@ -83,51 +94,238 @@ public class VectorUDAFAvgLong extends V
         soi = ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
     }
     
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int bufferIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(bufferIndex);
+      return myagg;
+    }
     
     @Override
-    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) 
-    throws HiveException {
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex, 
+      VectorizedRowBatch batch) throws HiveException {
       
-      inputExpression.evaluate(unit);
-      
-      LongColumnVector inputVector = (LongColumnVector)unit.
-      cols[this.inputExpression.getOutputColumn()];
-      
-      int batchSize = unit.size;
+      int batchSize = batch.size;
       
       if (batchSize == 0) {
         return;
       }
       
-      Aggregation myagg = (Aggregation)agg;
-
-      long[] vector = inputVector.vector;
+      inputExpression.evaluate(batch);
       
-      if (inputVector.isRepeating) {
-        if (inputVector.noNulls || !inputVector.isNull[0]) {
-          if (myagg.isNull) {
-            myagg.isNull = false;
-            myagg.sum = 0;
-            myagg.count = 0;
+      LongColumnVector inputVector = (LongColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+      long[] vector = inputVector.vector;
+
+      if (inputVector.noNulls) {
+        if (inputVector.isRepeating) {
+          iterateNoNullsRepeatingWithAggregationSelection(
+            aggregationBufferSets, bufferIndex,
+            vector[0], batchSize);
+        } else {
+          if (batch.selectedInUse) {
+            iterateNoNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batch.selected, batchSize);
+          } else {
+            iterateNoNullsWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize);
+          }
+        }
+      } else {
+        if (inputVector.isRepeating) {
+          if (batch.selectedInUse) {
+            iterateHasNullsRepeatingSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector[0], batchSize, batch.selected, inputVector.isNull);
+          } else {
+            iterateHasNullsRepeatingWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector[0], batchSize, inputVector.isNull);
+          }
+        } else {
+          if (batch.selectedInUse) {
+            iterateHasNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize, batch.selected, inputVector.isNull);
+          } else {
+            iterateHasNullsWithAggregationSelection(
+              aggregationBufferSets, bufferIndex,
+              vector, batchSize, inputVector.isNull);
           }
-          myagg.sum += vector[0]*batchSize;
-          myagg.count += batchSize;
         }
-        return;
       }
-      
-      if (!unit.selectedInUse && inputVector.noNulls) {
-        iterateNoSelectionNoNulls(myagg, vector, batchSize);
+    }
+
+    private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize) {
+
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(value);
       }
-      else if (!unit.selectedInUse) {
-        iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+    } 
+
+    private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int[] selection,
+      int batchSize) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(values[selection[i]]);
       }
-      else if (inputVector.noNulls){
-        iterateSelectionNoNulls(myagg, vector, batchSize, unit.selected);
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize) {
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          bufferIndex,
+          i);
+        myagg.sumValue(values[i]);
       }
-      else {
-        iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, unit.selected);
+    }
+
+    private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize,
+      int[] selection,
+      boolean[] isNull) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[selection[i]]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(value);
+        }
       }
+      
+    }
+
+    private void iterateHasNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long value,
+      int batchSize,
+      boolean[] isNull) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(value);
+        }
+      }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize,
+      int[] selection,
+      boolean[] isNull) {
+
+      for (int j=0; j < batchSize; ++j) {
+        int i = selection[j];
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            j);
+          myagg.sumValue(values[i]);
+        }
+      }
+   }
+
+    private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int bufferIndex,
+      long[] values,
+      int batchSize,
+      boolean[] isNull) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            bufferIndex,
+            i);
+          myagg.sumValue(values[i]);
+        }
+      }
+   }
+
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) throws HiveException {
+        
+        inputExpression.evaluate(batch);
+        
+        LongColumnVector inputVector = (LongColumnVector)batch.cols[this.inputExpression.getOutputColumn()];
+        
+        int batchSize = batch.size;
+        
+        if (batchSize == 0) {
+          return;
+        }
+        
+        Aggregation myagg = (Aggregation)agg;
+  
+        long[] vector = inputVector.vector;
+        
+        if (inputVector.isRepeating) {
+          if (inputVector.noNulls || !inputVector.isNull[0]) {
+            if (myagg.isNull) {
+              myagg.isNull = false;
+              myagg.sum = 0;
+              myagg.count = 0;
+            }
+            myagg.sum += vector[0]*batchSize;
+            myagg.count += batchSize;
+          }
+          return;
+        }
+        
+        if (!batch.selectedInUse && inputVector.noNulls) {
+          iterateNoSelectionNoNulls(myagg, vector, batchSize);
+        }
+        else if (!batch.selectedInUse) {
+          iterateNoSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull);
+        }
+        else if (inputVector.noNulls){
+          iterateSelectionNoNulls(myagg, vector, batchSize, batch.selected);
+        }
+        else {
+          iterateSelectionHasNulls(myagg, vector, batchSize, inputVector.isNull, batch.selected);
+        }
     }
   
     private void iterateSelectionHasNulls(
@@ -236,7 +434,7 @@ public class VectorUDAFAvgLong extends V
     
   @Override
     public ObjectInspector getOutputObjectInspector() {
-      return soi;
-    }
+    return soi;
+  }     
 }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
     VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@ public class VectorUDAFCountDouble exten
     static class Aggregation implements AggregationBuffer {
       long value;
       boolean isNull;
+      
+      public void initIfNull() {
+        if (isNull) {
+          isNull = false;
+          value = 0;
+        }
+      }
     }
     
     private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public class VectorUDAFCountDouble exten
       this.inputExpression = inputExpression;
       result = new LongWritable(0);
     }
+
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+      return myagg;
+    }
     
     @Override
-    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) 
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregateIndex, 
+      VectorizedRowBatch batch) throws HiveException {
+      
+      int batchSize = batch.size;
+      
+      if (batchSize == 0) {
+        return;
+      }
+
+      inputExpression.evaluate(batch);
+      
+      DoubleColumnVector inputVector = (DoubleColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+
+      if (inputVector.noNulls) {
+          // if there are no nulls then the iteration is the same on all cases
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, batchSize);
+      } else if (!batch.selectedInUse) {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, 
+            batchSize, inputVector.isNull);
+      } else if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, 
+            batchSize, batch.selected, inputVector.isNull);
+      }
+    }
+    
+    private void iterateNoNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize) {
+        
+        for (int i=0; i < batchSize; ++i) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregateIndex,
+            i);
+          myagg.initIfNull();
+          myagg.value++;
+        }
+    }
+
+    private void iterateHasNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize,
+        boolean[] isNull) {
+        
+        for (int i=0; i < batchSize; ++i) {
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets, 
+              aggregateIndex,
+              i);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize,
+        int[] selection,
+        boolean[] isNull) {
+        
+        for (int j=0; j < batchSize; ++j) {
+          int i = selection[j];
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets, 
+              aggregateIndex,
+              j);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) 
     throws HiveException {
       
-      inputExpression.evaluate(unit);
+      inputExpression.evaluate(batch);
       
-      DoubleColumnVector inputVector = (DoubleColumnVector)unit.
+      DoubleColumnVector inputVector = (DoubleColumnVector)batch.
         cols[this.inputExpression.getOutputColumn()];
       
-      int batchSize = unit.size;
+      int batchSize = batch.size;
       
       if (batchSize == 0) {
         return;
@@ -80,11 +183,8 @@ public class VectorUDAFCountDouble exten
       
       Aggregation myagg = (Aggregation)agg;
 
-      if (myagg.isNull) {
-        myagg.value = 0;
-        myagg.isNull = false;
-      }
-      
+      myagg.initIfNull();
+
       if (inputVector.isRepeating) {
         if (inputVector.noNulls || !inputVector.isNull[0]) {
           myagg.value += batchSize;
@@ -96,11 +196,11 @@ public class VectorUDAFCountDouble exten
         myagg.value += batchSize;
         return;
       }
-      else if (!unit.selectedInUse) {
+      else if (!batch.selectedInUse) {
         iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
       }
       else {
-        iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+        iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
       }
     }
   

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java?rev=1485419&r1=1485418&r2=1485419&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java Wed May 22 20:58:08 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
     VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
@@ -52,6 +53,13 @@ public class VectorUDAFCountLong extends
     static class Aggregation implements AggregationBuffer {
       long value;
       boolean isNull;
+      
+      public void initIfNull() {
+        if (isNull) {
+          isNull = false;
+          value = 0;
+        }
+      }
     }
     
     private VectorExpression inputExpression;
@@ -62,17 +70,112 @@ public class VectorUDAFCountLong extends
       this.inputExpression = inputExpression;
       result = new LongWritable(0);
     }
+
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+      return myagg;
+    }
     
     @Override
-    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch unit) 
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregateIndex, 
+      VectorizedRowBatch batch) throws HiveException {
+      
+      int batchSize = batch.size;
+      
+      if (batchSize == 0) {
+        return;
+      }
+
+      inputExpression.evaluate(batch);
+      
+      LongColumnVector inputVector = (LongColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+
+      if (inputVector.noNulls) {
+          // if there are no nulls then the iteration is the same on all cases
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, batchSize);
+      } else if (!batch.selectedInUse) {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, 
+            batchSize, inputVector.isNull);
+      } else if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, 
+            batchSize, batch.selected, inputVector.isNull);
+      }
+    }
+    
+    private void iterateNoNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize) {
+        
+        for (int i=0; i < batchSize; ++i) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregateIndex,
+            i);
+          myagg.initIfNull();
+          myagg.value++;
+        }
+    }
+
+    private void iterateHasNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize,
+        boolean[] isNull) {
+        
+        for (int i=0; i < batchSize; ++i) {
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets, 
+              aggregateIndex,
+              i);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex, 
+        int batchSize,
+        int[] selection,
+        boolean[] isNull) {
+        
+        for (int j=0; j < batchSize; ++j) {
+          int i = selection[j];
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets, 
+              aggregateIndex,
+              j);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) 
     throws HiveException {
       
-      inputExpression.evaluate(unit);
+      inputExpression.evaluate(batch);
       
-      LongColumnVector inputVector = (LongColumnVector)unit.
+      LongColumnVector inputVector = (LongColumnVector)batch.
         cols[this.inputExpression.getOutputColumn()];
       
-      int batchSize = unit.size;
+      int batchSize = batch.size;
       
       if (batchSize == 0) {
         return;
@@ -80,11 +183,8 @@ public class VectorUDAFCountLong extends
       
       Aggregation myagg = (Aggregation)agg;
 
-      if (myagg.isNull) {
-        myagg.value = 0;
-        myagg.isNull = false;
-      }
-      
+      myagg.initIfNull();
+
       if (inputVector.isRepeating) {
         if (inputVector.noNulls || !inputVector.isNull[0]) {
           myagg.value += batchSize;
@@ -96,11 +196,11 @@ public class VectorUDAFCountLong extends
         myagg.value += batchSize;
         return;
       }
-      else if (!unit.selectedInUse) {
+      else if (!batch.selectedInUse) {
         iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
       }
       else {
-        iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, unit.selected);
+        iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
       }
     }