You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/03 20:15:17 UTC

svn commit: r1489089 - in /hive/branches/vectorization/ql/src: java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/ java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/ java/or...

Author: hashutosh
Date: Mon Jun  3 18:15:16 2013
New Revision: 1489089

URL: http://svn.apache.org/r1489089
Log:
HIVE-4451 : Add support for string column type vector aggregates: COUNT, MIN and MAX (Remus Rusanu via Ashutosh Chauhan)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt
Removed:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountDouble.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFCountLong.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Mon Jun  3 18:15:16 2013
@@ -41,15 +41,16 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.expressions.SelectColumnIsTrue;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgLong;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountDouble;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFCountLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdSampDouble;
@@ -941,11 +942,14 @@ public class VectorizationContext {
   static Object[][] aggregatesDefinition = {
     {"min",       "Long",   VectorUDAFMinLong.class},
     {"min",       "Double", VectorUDAFMinDouble.class},
+    {"min",       "String", VectorUDAFMinString.class},
     {"max",       "Long",   VectorUDAFMaxLong.class},
     {"max",       "Double", VectorUDAFMaxDouble.class},
+    {"max",       "String", VectorUDAFMaxString.class},
     {"count",     null,     VectorUDAFCountStar.class},
-    {"count",     "Long",   VectorUDAFCountLong.class},
-    {"count",     "Double", VectorUDAFCountDouble.class},
+    {"count",     "Long",   VectorUDAFCount.class},
+    {"count",     "Double", VectorUDAFCount.class},
+    {"count",     "String", VectorUDAFCount.class},
     {"sum",       "Long",   VectorUDAFSumLong.class},
     {"sum",       "Double", VectorUDAFSumDouble.class},
     {"avg",       "Long",   VectorUDAFAvgLong.class},

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java Mon Jun  3 18:15:16 2013
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+* VectorUDAFCountLong. Vectorized implementation for COUNT aggregates.
+*/
+@Description(name = "count", value = "_FUNC_(expr) - Returns the count (vectorized)")
+public class VectorUDAFCount extends VectorAggregateExpression {
+
+    /**
+    /* class for storing the current aggregate value.
+    */
+    static class Aggregation implements AggregationBuffer {
+      long value;
+      boolean isNull;
+
+      public void initIfNull() {
+        if (isNull) {
+          isNull = false;
+          value = 0;
+        }
+      }
+    }
+
+    private final VectorExpression inputExpression;
+  private final LongWritable result;
+
+    public VectorUDAFCount(VectorExpression inputExpression) {
+      super();
+      this.inputExpression = inputExpression;
+      result = new LongWritable(0);
+    }
+
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregateIndex);
+      return myagg;
+    }
+
+    @Override
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregateIndex,
+      VectorizedRowBatch batch) throws HiveException {
+
+      int batchSize = batch.size;
+
+      if (batchSize == 0) {
+        return;
+      }
+
+      inputExpression.evaluate(batch);
+
+      ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()];
+
+      if (inputVector.noNulls) {
+          // if there are no nulls then the iteration is the same on all cases
+          iterateNoNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex, batchSize);
+      } else if (!batch.selectedInUse) {
+          iterateHasNullsWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            batchSize, inputVector.isNull);
+      } else if (batch.selectedInUse) {
+          iterateHasNullsSelectionWithAggregationSelection(
+            aggregationBufferSets, aggregateIndex,
+            batchSize, batch.selected, inputVector.isNull);
+      }
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int batchSize) {
+
+        for (int i=0; i < batchSize; ++i) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets,
+            aggregateIndex,
+            i);
+          myagg.initIfNull();
+          myagg.value++;
+        }
+    }
+
+    private void iterateHasNullsWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int batchSize,
+        boolean[] isNull) {
+
+        for (int i=0; i < batchSize; ++i) {
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets,
+              aggregateIndex,
+              i);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregateIndex,
+        int batchSize,
+        int[] selection,
+        boolean[] isNull) {
+
+        for (int j=0; j < batchSize; ++j) {
+          int i = selection[j];
+          if (!isNull[i]) {
+            Aggregation myagg = getCurrentAggregationBuffer(
+              aggregationBufferSets,
+              aggregateIndex,
+              j);
+            myagg.initIfNull();
+            myagg.value++;
+          }
+        }
+    }
+
+
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+    throws HiveException {
+
+      inputExpression.evaluate(batch);
+
+      ColumnVector inputVector = batch.cols[this.inputExpression.getOutputColumn()];
+
+      int batchSize = batch.size;
+
+      if (batchSize == 0) {
+        return;
+      }
+
+      Aggregation myagg = (Aggregation)agg;
+
+      myagg.initIfNull();
+
+      if (inputVector.isRepeating) {
+        if (inputVector.noNulls || !inputVector.isNull[0]) {
+          myagg.value += batchSize;
+        }
+        return;
+      }
+
+      if (inputVector.noNulls) {
+        myagg.value += batchSize;
+        return;
+      }
+      else if (!batch.selectedInUse) {
+        iterateNoSelectionHasNulls(myagg, batchSize, inputVector.isNull);
+      }
+      else {
+        iterateSelectionHasNulls(myagg, batchSize, inputVector.isNull, batch.selected);
+      }
+    }
+
+    private void iterateSelectionHasNulls(
+        Aggregation myagg,
+        int batchSize,
+        boolean[] isNull,
+        int[] selected) {
+
+      for (int j=0; j< batchSize; ++j) {
+        int i = selected[j];
+        if (!isNull[i]) {
+          myagg.value += 1;
+        }
+      }
+    }
+
+    private void iterateNoSelectionHasNulls(
+        Aggregation myagg,
+        int batchSize,
+        boolean[] isNull) {
+
+      for (int i=0; i< batchSize; ++i) {
+        if (!isNull[i]) {
+          myagg.value += 1;
+        }
+      }
+    }
+
+    @Override
+    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+      return new Aggregation();
+    }
+
+    @Override
+    public void reset(AggregationBuffer agg) throws HiveException {
+      Aggregation myAgg = (Aggregation) agg;
+      myAgg.isNull = true;
+    }
+
+    @Override
+    public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
+    Aggregation myagg = (Aggregation) agg;
+      if (myagg.isNull) {
+      return null;
+      }
+      else {
+        result.set (myagg.value);
+      return result;
+      }
+    }
+
+    @Override
+    public ObjectInspector getOutputObjectInspector() {
+      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+    }
+
+}
+

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMaxString.java Mon Jun  3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+    VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* VectorUDAFMaxString. Vectorized implementation for MIN/MAX aggregates. 
+*/
+@Description(name = "max", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)")
+public class VectorUDAFMaxString extends VectorAggregateExpression {
+
+    /** 
+    /* class for storing the current aggregate value.
+    */
+    static private final class Aggregation implements AggregationBuffer {
+
+      final static int MIN_BUFFER_SIZE = 16;
+      byte[] bytes = new byte[MIN_BUFFER_SIZE];
+      int length;
+      boolean isNull;
+
+      public void checkValue(byte[] bytes, int start, int length) {
+        if (isNull) {
+          isNull = false;
+          assign(bytes, start, length);
+        } else if (StringExpr.compare(
+                bytes, start, length,
+                this.bytes, 0, this.length) > 0) {
+          assign(bytes, start, length);
+        }
+      }
+      
+      public void assign(byte[] bytes, int start, int length) {
+        // Avoid new allocation if possible
+        if (this.bytes.length < length) {
+          this.bytes = new byte[length];
+        }
+        System.arraycopy(bytes, start, this.bytes, 0, length);
+        this.length = length;
+      }
+    }
+    
+    private VectorExpression inputExpression;
+    private BytesWritable result;
+    
+    public VectorUDAFMaxString(VectorExpression inputExpression) {
+      super();
+      this.inputExpression = inputExpression;
+      result = new BytesWritable();
+    }
+    
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregrateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+      return myagg;
+    }
+    
+@Override
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex, 
+      VectorizedRowBatch batch) throws HiveException {
+      
+      int batchSize = batch.size;
+      
+      if (batchSize == 0) {
+        return;
+      }
+      
+      inputExpression.evaluate(batch);
+      
+      BytesColumnVector inputColumn = (BytesColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+
+      if (inputColumn.noNulls) {
+        if (inputColumn.isRepeating) {
+          iterateNoNullsRepeatingWithAggregationSelection(
+            aggregationBufferSets, aggregrateIndex,
+            inputColumn, batchSize);
+        } else {
+          if (batch.selectedInUse) {
+            iterateNoNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batch.selected, batchSize);
+          } else {
+            iterateNoNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      } else {
+        if (inputColumn.isRepeating) {
+          // All nulls, no-op for min/max
+        } else {
+          if (batch.selectedInUse) {
+            iterateHasNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize, batch.selected);
+          } else {
+            iterateHasNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      }
+    }
+
+    private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      byte[] bytes = inputColumn.vector[0];
+      int start = inputColumn.start[0];
+      int length = inputColumn.length[0];
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(bytes, start, length);
+      }
+    } 
+
+    private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int[] selection,
+      int batchSize) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[row],
+          inputColumn.start[row],
+          inputColumn.length[row]);
+      }
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize,
+      int[] selection) {
+
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        if (!inputColumn.isNull[row]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[row],
+            inputColumn.start[row],
+            inputColumn.length[row]);
+        }
+      }
+   }
+
+    private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+   }
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) 
+      throws HiveException {
+        
+        inputExpression.evaluate(batch);
+        
+        BytesColumnVector inputColumn = (BytesColumnVector)batch.
+            cols[this.inputExpression.getOutputColumn()];
+        
+        int batchSize = batch.size;
+        
+        if (batchSize == 0) {
+          return;
+        }
+        
+        Aggregation myagg = (Aggregation)agg;
+          
+        if (inputColumn.isRepeating) {
+          if (inputColumn.noNulls) {
+            myagg.checkValue(inputColumn.vector[0],
+              inputColumn.start[0],
+              inputColumn.length[0]);
+          }
+          return;
+        }
+        
+        if (!batch.selectedInUse && inputColumn.noNulls) {
+          iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+        }
+        else if (!batch.selectedInUse) {
+          iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+        }
+        else if (inputColumn.noNulls){
+          iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+        else {
+          iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+    }
+  
+    private void iterateSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize,
+        int[] selected) {
+      
+      for (int j=0; j< batchSize; ++j) {
+        int i = selected[j];
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize, 
+        int[] selected) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateNoSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateNoSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn,
+        int batchSize) {
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    @Override
+    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+      return new Aggregation();
+    }
+
+    @Override
+    public void reset(AggregationBuffer agg) throws HiveException {
+      Aggregation myAgg = (Aggregation) agg;
+      myAgg.isNull = true;
+    }
+
+    @Override
+    public Object evaluateOutput(
+        AggregationBuffer agg) throws HiveException {
+    Aggregation myagg = (Aggregation) agg;
+      if (myagg.isNull) {
+        return null;
+      }
+      else {
+        result.set(myagg.bytes, 0, myagg.length);
+        return result;
+      }
+    }
+    
+    @Override
+    public ObjectInspector getOutputObjectInspector() {
+      return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+    }
+}
+

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen/VectorUDAFMinString.java Mon Jun  3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+    VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* VectorUDAFMinString. Vectorized implementation for MIN/MAX aggregates. 
+*/
+@Description(name = "min", value = "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)")
+public class VectorUDAFMinString extends VectorAggregateExpression {
+
+    /** 
+    /* class for storing the current aggregate value.
+    */
+    static private final class Aggregation implements AggregationBuffer {
+
+      final static int MIN_BUFFER_SIZE = 16;
+      byte[] bytes = new byte[MIN_BUFFER_SIZE];
+      int length;
+      boolean isNull;
+
+      public void checkValue(byte[] bytes, int start, int length) {
+        if (isNull) {
+          isNull = false;
+          assign(bytes, start, length);
+        } else if (StringExpr.compare(
+                bytes, start, length,
+                this.bytes, 0, this.length) < 0) {
+          assign(bytes, start, length);
+        }
+      }
+      
+      public void assign(byte[] bytes, int start, int length) {
+        // Avoid new allocation if possible
+        if (this.bytes.length < length) {
+          this.bytes = new byte[length];
+        }
+        System.arraycopy(bytes, start, this.bytes, 0, length);
+        this.length = length;
+      }
+    }
+    
+    private VectorExpression inputExpression;
+    private BytesWritable result;
+    
+    public VectorUDAFMinString(VectorExpression inputExpression) {
+      super();
+      this.inputExpression = inputExpression;
+      result = new BytesWritable();
+    }
+    
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregrateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+      return myagg;
+    }
+    
+@Override
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex, 
+      VectorizedRowBatch batch) throws HiveException {
+      
+      int batchSize = batch.size;
+      
+      if (batchSize == 0) {
+        return;
+      }
+      
+      inputExpression.evaluate(batch);
+      
+      BytesColumnVector inputColumn = (BytesColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+
+      if (inputColumn.noNulls) {
+        if (inputColumn.isRepeating) {
+          iterateNoNullsRepeatingWithAggregationSelection(
+            aggregationBufferSets, aggregrateIndex,
+            inputColumn, batchSize);
+        } else {
+          if (batch.selectedInUse) {
+            iterateNoNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batch.selected, batchSize);
+          } else {
+            iterateNoNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      } else {
+        if (inputColumn.isRepeating) {
+          // All nulls, no-op for min/max
+        } else {
+          if (batch.selectedInUse) {
+            iterateHasNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize, batch.selected);
+          } else {
+            iterateHasNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      }
+    }
+
+    private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      byte[] bytes = inputColumn.vector[0];
+      int start = inputColumn.start[0];
+      int length = inputColumn.length[0];
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(bytes, start, length);
+      }
+    } 
+
+    private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int[] selection,
+      int batchSize) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[row],
+          inputColumn.start[row],
+          inputColumn.length[row]);
+      }
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize,
+      int[] selection) {
+
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        if (!inputColumn.isNull[row]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[row],
+            inputColumn.start[row],
+            inputColumn.length[row]);
+        }
+      }
+   }
+
+    private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+   }
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) 
+      throws HiveException {
+        
+        inputExpression.evaluate(batch);
+        
+        BytesColumnVector inputColumn = (BytesColumnVector)batch.
+            cols[this.inputExpression.getOutputColumn()];
+        
+        int batchSize = batch.size;
+        
+        if (batchSize == 0) {
+          return;
+        }
+        
+        Aggregation myagg = (Aggregation)agg;
+          
+        if (inputColumn.isRepeating) {
+          if (inputColumn.noNulls) {
+            myagg.checkValue(inputColumn.vector[0],
+              inputColumn.start[0],
+              inputColumn.length[0]);
+          }
+          return;
+        }
+        
+        if (!batch.selectedInUse && inputColumn.noNulls) {
+          iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+        }
+        else if (!batch.selectedInUse) {
+          iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+        }
+        else if (inputColumn.noNulls){
+          iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+        else {
+          iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+    }
+  
+    private void iterateSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize,
+        int[] selected) {
+      
+      for (int j=0; j< batchSize; ++j) {
+        int i = selected[j];
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize, 
+        int[] selected) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateNoSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateNoSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn,
+        int batchSize) {
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    @Override
+    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+      return new Aggregation();
+    }
+
+    @Override
+    public void reset(AggregationBuffer agg) throws HiveException {
+      Aggregation myAgg = (Aggregation) agg;
+      myAgg.isNull = true;
+    }
+
+    @Override
+    public Object evaluateOutput(
+        AggregationBuffer agg) throws HiveException {
+    Aggregation myagg = (Aggregation) agg;
+      if (myagg.isNull) {
+        return null;
+      }
+      else {
+        result.set(myagg.bytes, 0, myagg.length);
+        return result;
+      }
+    }
+    
+    @Override
+    public ObjectInspector getOutputObjectInspector() {
+      return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+    }
+}
+

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/CodeGen.java Mon Jun  3 18:15:16 2013
@@ -194,9 +194,8 @@ public class CodeGen {
         {"VectorUDAFMinMax", "VectorUDAFMaxLong", "long", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: long)"},
         {"VectorUDAFMinMax", "VectorUDAFMaxDouble", "double", ">", "max", "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: double)"},
 
-        //template, <ClassName>, <ValueType>
-        {"VectorUDAFCount", "VectorUDAFCountLong", "long"},
-        {"VectorUDAFCount", "VectorUDAFCountDouble", "double"},
+        {"VectorUDAFMinMaxString", "VectorUDAFMinString", "<", "min", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"},
+        {"VectorUDAFMinMaxString", "VectorUDAFMaxString", ">", "max", "_FUNC_(expr) - Returns the minimum value of expr (vectorized, type: string)"},
 
         //template, <ClassName>, <ValueType>
         {"VectorUDAFSum", "VectorUDAFSumLong", "long"},
@@ -280,6 +279,8 @@ public class CodeGen {
         generateVectorUDAFCount(tdesc);
       } else if (tdesc[0].equals("VectorUDAFMinMax")) {
         generateVectorUDAFMinMax(tdesc);
+      } else if (tdesc[0].equals("VectorUDAFMinMaxString")) {
+        generateVectorUDAFMinMaxString(tdesc);
       } else if (tdesc[0].equals("VectorUDAFSum")) {
         generateVectorUDAFSum(tdesc);
       } else if (tdesc[0].equals("VectorUDAFAvg")) {
@@ -323,6 +324,25 @@ public class CodeGen {
 
   }
 
+  private void generateVectorUDAFMinMaxString(String[] tdesc) throws Exception {
+    String className = tdesc[1];
+    String operatorSymbol = tdesc[2];
+    String descName = tdesc[3];
+    String descValue = tdesc[4];
+
+    String outputFile = joinPath(this.outputDirectory, className + ".java");
+    String templateFile = joinPath(this.templateDirectory, tdesc[0] + ".txt");
+
+    String templateString = readFile(templateFile);
+    templateString = templateString.replaceAll("<ClassName>", className);
+    templateString = templateString.replaceAll("<OperatorSymbol>", operatorSymbol);
+    templateString = templateString.replaceAll("<DescriptionName>", descName);
+    templateString = templateString.replaceAll("<DescriptionValue>", descValue);
+    writeFile(outputFile, templateString);
+
+  }
+
+
   private void generateVectorUDAFCount(String[] tdesc) throws IOException {
     String className = tdesc[1];
     String valueType = tdesc[2];

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFCount.txt Mon Jun  3 18:15:16 2013
@@ -44,7 +44,7 @@ import org.apache.hadoop.hive.serde2.obj
 /**
 * <ClassName>. Vectorized implementation for COUNT aggregates. 
 */
-@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized, type: <ValueType>)")
+@Description(name = "count", value = "_FUNC_(expr) - Returns the maximum value of expr (vectorized)")
 public class <ClassName> extends VectorAggregateExpression {
     
     /** 
@@ -94,7 +94,7 @@ public class <ClassName> extends VectorA
 
       inputExpression.evaluate(batch);
       
-      <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
+      VectorColumn inputVector = (VectorColumn)batch.
         cols[this.inputExpression.getOutputColumn()];
 
       if (inputVector.noNulls) {
@@ -172,7 +172,7 @@ public class <ClassName> extends VectorA
       
       inputExpression.evaluate(batch);
       
-      <InputColumnVectorType> inputVector = (<InputColumnVectorType>)batch.
+      VectorColumn inputVector = (VectorColumn)batch.
         cols[this.inputExpression.getOutputColumn()];
       
       int batchSize = batch.size;

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt?rev=1489089&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/templates/VectorUDAFMinMaxString.txt Mon Jun  3 18:15:16 2013
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.
+    VectorAggregateExpression.AggregationBuffer;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+* <ClassName>. Vectorized implementation for MIN/MAX aggregates. 
+*/
+@Description(name = "<DescriptionName>", value = "<DescriptionValue>")
+public class <ClassName> extends VectorAggregateExpression {
+
+    /** 
+    /* class for storing the current aggregate value.
+    */
+    static private final class Aggregation implements AggregationBuffer {
+
+      final static int MIN_BUFFER_SIZE = 16;
+      byte[] bytes = new byte[MIN_BUFFER_SIZE];
+      int length;
+      boolean isNull;
+
+      public void checkValue(byte[] bytes, int start, int length) {
+        if (isNull) {
+          isNull = false;
+          assign(bytes, start, length);
+        } else if (StringExpr.compare(
+                bytes, start, length,
+                this.bytes, 0, this.length) <OperatorSymbol> 0) {
+          assign(bytes, start, length);
+        }
+      }
+      
+      public void assign(byte[] bytes, int start, int length) {
+        // Avoid new allocation if possible
+        if (this.bytes.length < length) {
+          this.bytes = new byte[length];
+        }
+        System.arraycopy(bytes, start, this.bytes, 0, length);
+        this.length = length;
+      }
+    }
+    
+    private VectorExpression inputExpression;
+    private BytesWritable result;
+    
+    public <ClassName>(VectorExpression inputExpression) {
+      super();
+      this.inputExpression = inputExpression;
+      result = new BytesWritable();
+    }
+    
+    private Aggregation getCurrentAggregationBuffer(
+        VectorAggregationBufferRow[] aggregationBufferSets,
+        int aggregrateIndex,
+        int row) {
+      VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+      Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+      return myagg;
+    }
+    
+@Override
+    public void aggregateInputSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex, 
+      VectorizedRowBatch batch) throws HiveException {
+      
+      int batchSize = batch.size;
+      
+      if (batchSize == 0) {
+        return;
+      }
+      
+      inputExpression.evaluate(batch);
+      
+      BytesColumnVector inputColumn = (BytesColumnVector)batch.
+        cols[this.inputExpression.getOutputColumn()];
+
+      if (inputColumn.noNulls) {
+        if (inputColumn.isRepeating) {
+          iterateNoNullsRepeatingWithAggregationSelection(
+            aggregationBufferSets, aggregrateIndex,
+            inputColumn, batchSize);
+        } else {
+          if (batch.selectedInUse) {
+            iterateNoNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batch.selected, batchSize);
+          } else {
+            iterateNoNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      } else {
+        if (inputColumn.isRepeating) {
+          // All nulls, no-op for min/max
+        } else {
+          if (batch.selectedInUse) {
+            iterateHasNullsSelectionWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize, batch.selected);
+          } else {
+            iterateHasNullsWithAggregationSelection(
+              aggregationBufferSets, aggregrateIndex,
+              inputColumn, batchSize);
+          }
+        }
+      }
+    }
+
+    private void iterateNoNullsRepeatingWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      byte[] bytes = inputColumn.vector[0];
+      int start = inputColumn.start[0];
+      int length = inputColumn.length[0];
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(bytes, start, length);
+      }
+    } 
+
+    private void iterateNoNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int[] selection,
+      int batchSize) {
+      
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[row],
+          inputColumn.start[row],
+          inputColumn.length[row]);
+      }
+    }
+
+    private void iterateNoNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+      for (int i=0; i < batchSize; ++i) {
+        Aggregation myagg = getCurrentAggregationBuffer(
+          aggregationBufferSets, 
+          aggregrateIndex,
+          i);
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateHasNullsSelectionWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize,
+      int[] selection) {
+
+      for (int i=0; i < batchSize; ++i) {
+        int row = selection[i];
+        if (!inputColumn.isNull[row]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[row],
+            inputColumn.start[row],
+            inputColumn.length[row]);
+        }
+      }
+   }
+
+    private void iterateHasNullsWithAggregationSelection(
+      VectorAggregationBufferRow[] aggregationBufferSets,
+      int aggregrateIndex,
+      BytesColumnVector inputColumn,
+      int batchSize) {
+
+      for (int i=0; i < batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          Aggregation myagg = getCurrentAggregationBuffer(
+            aggregationBufferSets, 
+            aggregrateIndex,
+            i);
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+   }
+    
+    @Override
+    public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch) 
+      throws HiveException {
+        
+        inputExpression.evaluate(batch);
+        
+        BytesColumnVector inputColumn = (BytesColumnVector)batch.
+            cols[this.inputExpression.getOutputColumn()];
+        
+        int batchSize = batch.size;
+        
+        if (batchSize == 0) {
+          return;
+        }
+        
+        Aggregation myagg = (Aggregation)agg;
+          
+        if (inputColumn.isRepeating) {
+          if (inputColumn.noNulls) {
+            myagg.checkValue(inputColumn.vector[0],
+              inputColumn.start[0],
+              inputColumn.length[0]);
+          }
+          return;
+        }
+        
+        if (!batch.selectedInUse && inputColumn.noNulls) {
+          iterateNoSelectionNoNulls(myagg, inputColumn, batchSize);
+        }
+        else if (!batch.selectedInUse) {
+          iterateNoSelectionHasNulls(myagg, inputColumn, batchSize);
+        }
+        else if (inputColumn.noNulls){
+          iterateSelectionNoNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+        else {
+          iterateSelectionHasNulls(myagg, inputColumn, batchSize, batch.selected);
+        }
+    }
+  
+    private void iterateSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize,
+        int[] selected) {
+      
+      for (int j=0; j< batchSize; ++j) {
+        int i = selected[j];
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize, 
+        int[] selected) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    private void iterateNoSelectionHasNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn, 
+        int batchSize) {
+      
+      for (int i=0; i< batchSize; ++i) {
+        if (!inputColumn.isNull[i]) {
+          myagg.checkValue(inputColumn.vector[i],
+            inputColumn.start[i],
+            inputColumn.length[i]);
+        }
+      }
+    }
+
+    private void iterateNoSelectionNoNulls(
+        Aggregation myagg, 
+        BytesColumnVector inputColumn,
+        int batchSize) {
+      for (int i=0; i< batchSize; ++i) {
+        myagg.checkValue(inputColumn.vector[i],
+          inputColumn.start[i],
+          inputColumn.length[i]);
+      }
+    }
+
+    @Override
+    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+      return new Aggregation();
+    }
+
+    @Override
+    public void reset(AggregationBuffer agg) throws HiveException {
+      Aggregation myAgg = (Aggregation) agg;
+      myAgg.isNull = true;
+    }
+
+    @Override
+    public Object evaluateOutput(
+        AggregationBuffer agg) throws HiveException {
+    Aggregation myagg = (Aggregation) agg;
+      if (myagg.isNull) {
+        return null;
+      }
+      else {
+        result.set(myagg.bytes, 0, myagg.length);
+        return result;
+      }
+    }
+    
+    @Override
+    public ObjectInspector getOutputObjectInspector() {
+      return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+    }
+}
+

Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java?rev=1489089&r1=1489088&r2=1489089&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java Mon Jun  3 18:15:16 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -66,9 +67,10 @@ public class TestVectorGroupByOperator {
   private static AggregationDesc buildAggregationDesc(
       VectorizationContext ctx,
       String aggregate,
-      String column) {
+      String column,
+      TypeInfo typeInfo) {
 
-    ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, TypeInfoFactory.longTypeInfo);
+    ExprNodeDesc inputColumn = buildColumnDesc(ctx, column, typeInfo);
 
     ArrayList<ExprNodeDesc> params = new ArrayList<ExprNodeDesc>();
     params.add(inputColumn);
@@ -88,12 +90,13 @@ public class TestVectorGroupByOperator {
   }
 
 
-  private static GroupByDesc buildGroupByDesc(
+  private static GroupByDesc buildGroupByDescLong(
       VectorizationContext ctx,
       String aggregate,
       String column) {
 
-    AggregationDesc agg = buildAggregationDesc(ctx, aggregate, column);
+    AggregationDesc agg = buildAggregationDesc(ctx, aggregate,
+        column, TypeInfoFactory.longTypeInfo);
     ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
     aggs.add(agg);
 
@@ -106,6 +109,28 @@ public class TestVectorGroupByOperator {
 
     return desc;
   }
+
+  private static GroupByDesc buildGroupByDescString(
+      VectorizationContext ctx,
+      String aggregate,
+      String column) {
+
+    AggregationDesc agg = buildAggregationDesc(ctx, aggregate,
+        column, TypeInfoFactory.stringTypeInfo);
+    ArrayList<AggregationDesc> aggs = new ArrayList<AggregationDesc>();
+    aggs.add(agg);
+
+    ArrayList<String> outputColumnNames = new ArrayList<String>();
+    outputColumnNames.add("_col0");
+
+    GroupByDesc desc = new GroupByDesc();
+    desc.setOutputColumnNames(outputColumnNames);
+    desc.setAggregators(aggs);
+
+    return desc;
+  }
+
+
   private static GroupByDesc buildGroupByDescCountStar(
       VectorizationContext ctx) {
 
@@ -131,7 +156,7 @@ public class TestVectorGroupByOperator {
       TypeInfo typeInfo,
       String key) {
 
-    GroupByDesc desc = buildGroupByDesc(ctx, aggregate, column);
+    GroupByDesc desc = buildGroupByDescLong(ctx, aggregate, column);
 
     ExprNodeDesc keyExp = buildColumnDesc(ctx, key, typeInfo);
     ArrayList<ExprNodeDesc> keys = new ArrayList<ExprNodeDesc>();
@@ -150,6 +175,76 @@ public class TestVectorGroupByOperator {
   }
 
   @Test
+  public void testCountString () throws HiveException {
+    testAggregateString(
+        "count",
+        2,
+        Arrays.asList(new Object[]{"A","B","C"}),
+        3L);
+  }
+
+  @Test
+  public void testMaxString () throws HiveException {
+    testAggregateString(
+        "max",
+        2,
+        Arrays.asList(new Object[]{"A","B","C"}),
+        "C");
+    testAggregateString(
+        "max",
+        2,
+        Arrays.asList(new Object[]{"C", "B", "A"}),
+        "C");
+  }
+
+  @Test
+  public void testMinString () throws HiveException {
+    testAggregateString(
+        "min",
+        2,
+        Arrays.asList(new Object[]{"A","B","C"}),
+        "A");
+    testAggregateString(
+        "min",
+        2,
+        Arrays.asList(new Object[]{"C", "B", "A"}),
+        "A");
+  }
+
+  @Test
+  public void testMaxNullString () throws HiveException {
+    testAggregateString(
+        "max",
+        2,
+        Arrays.asList(new Object[]{"A","B",null}),
+        "B");
+    testAggregateString(
+        "max",
+        2,
+        Arrays.asList(new Object[]{null, null, null}),
+        null);
+  }
+
+  @Test
+  public void testCountStringWithNull () throws HiveException {
+    testAggregateString(
+        "count",
+        2,
+        Arrays.asList(new Object[]{"A",null,"C", "D", null}),
+        3L);
+  }
+
+  @Test
+  public void testCountStringAllNull () throws HiveException {
+    testAggregateString(
+        "count",
+        4,
+        Arrays.asList(new Object[]{null, null, null, null, null}),
+        0L);
+  }
+
+
+  @Test
   public void testMinLongNullStringKeys() throws HiveException {
     testAggregateStringKeyAggregate(
         "min",
@@ -969,6 +1064,19 @@ public class TestVectorGroupByOperator {
     testAggregateLongKeyIterable (aggregateName, fdr, expected);
   }
 
+  public void testAggregateString (
+      String aggregateName,
+      int batchSize,
+      Iterable<Object> values,
+      Object expected) throws HiveException {
+
+    @SuppressWarnings("unchecked")
+    FakeVectorRowBatchFromObjectIterables fdr = new FakeVectorRowBatchFromObjectIterables(
+        batchSize, new String[] {"string"}, values);
+    testAggregateStringIterable (aggregateName, fdr, expected);
+  }
+
+
   public void testAggregateLongAggregate (
       String aggregateName,
       int batchSize,
@@ -1001,14 +1109,19 @@ public class TestVectorGroupByOperator {
 
       assertEquals(true, result instanceof Object[]);
       Object[] arr = (Object[]) result;
-      assertEquals (1, arr.length);
+      assertEquals(1, arr.length);
 
       if (expected == null) {
         assertNull (arr[0]);
-      } else {
-        assertEquals (true, arr[0] instanceof LongWritable);
+      } else if (arr[0] instanceof LongWritable) {
         LongWritable lw = (LongWritable) arr[0];
-        assertEquals ((Long) expected, (Long) lw.get());
+        assertEquals((Long) expected, (Long) lw.get());
+      } else if (arr[0] instanceof BytesWritable) {
+        BytesWritable bw = (BytesWritable) arr[0];
+        String sbw = new String(bw.getBytes());
+        assertEquals((String) expected, sbw);
+      } else {
+        Assert.fail("Unsupported result type: " + expected.getClass().getName());
       }
     }
   }
@@ -1159,6 +1272,37 @@ public class TestVectorGroupByOperator {
     validator.validate(expected, result);
   }
 
+  public void testAggregateStringIterable (
+      String aggregateName,
+      Iterable<VectorizedRowBatch> data,
+      Object expected) throws HiveException {
+    Map<String, Integer> mapColumnNames = new HashMap<String, Integer>();
+    mapColumnNames.put("A", 0);
+    VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
+
+    GroupByDesc desc = buildGroupByDescString (ctx, aggregateName, "A");
+
+    VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);
+
+    FakeCaptureOutputOperator out = FakeCaptureOutputOperator.addCaptureOutputChild(vgo);
+    vgo.initialize(null, null);
+
+    for (VectorizedRowBatch unit: data) {
+      vgo.process(unit,  0);
+    }
+    vgo.close(false);
+
+    List<Object> outBatchList = out.getCapturedRows();
+    assertNotNull(outBatchList);
+    assertEquals(1, outBatchList.size());
+
+    Object result = outBatchList.get(0);
+
+    Validator validator = getValidator(aggregateName);
+    validator.validate(expected, result);
+  }
+
+
   public void testAggregateLongIterable (
       String aggregateName,
       Iterable<VectorizedRowBatch> data,
@@ -1167,7 +1311,7 @@ public class TestVectorGroupByOperator {
     mapColumnNames.put("A", 0);
     VectorizationContext ctx = new VectorizationContext(mapColumnNames, 1);
 
-    GroupByDesc desc = buildGroupByDesc (ctx, aggregateName, "A");
+    GroupByDesc desc = buildGroupByDescLong (ctx, aggregateName, "A");
 
     VectorGroupByOperator vgo = new VectorGroupByOperator(ctx, desc);