You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:54 UTC

[07/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
index 6311335..5bb6bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.ql.session.SessionState;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
index 1625ce1..271e3c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.ql.log.PerfLogger;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
index 2535b10..3aebbe1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;
@@ -79,7 +96,9 @@ class RenderStrategy {
               perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
             }
 
-            perfLogger.PerfLogEnd(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+            if (!perfLogger.endTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+              perfLogger.PerfLogEnd(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+            }
           }
           if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
index 3475fc2..9739ad7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.common.log.ProgressMonitor;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
index 630046d..84128e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
@@ -57,7 +57,7 @@ public class VectorAggregationBufferBatch {
   /**
    * Memory consumed by a set of aggregation buffers
    */
-  private int aggregatorsFixedSize;
+  private long aggregatorsFixedSize;
 
   /**
    * Array of indexes for aggregators that have variable size
@@ -76,7 +76,7 @@ public class VectorAggregationBufferBatch {
    * Returns the fixed size consumed by the aggregation buffers
    * @return
    */
-  public int getAggregatorsFixedSize() {
+  public long getAggregatorsFixedSize() {
     return aggregatorsFixedSize;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
index 935b47b..7ac4f07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
@@ -20,10 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.util.Arrays;
 
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * Class to keep information on a set of typed vector columns.  Used by
@@ -64,147 +62,87 @@ public class VectorColumnSetInfo {
    */
   protected int[] intervalDayTimeIndices;
 
-  /**
-   * Helper class for looking up a key value based on key index.
-   */
-  public class KeyLookupHelper {
-    public int longIndex;
-    public int doubleIndex;
-    public int stringIndex;
-    public int decimalIndex;
-    public int timestampIndex;
-    public int intervalDayTimeIndex;
-
-    private static final int INDEX_UNUSED = -1;
-
-    private void resetIndices() {
-        this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex =
-            timestampIndex = intervalDayTimeIndex = INDEX_UNUSED;
-    }
-    public void setLong(int index) {
-      resetIndices();
-      this.longIndex= index;
-    }
-
-    public void setDouble(int index) {
-      resetIndices();
-      this.doubleIndex = index;
-    }
-
-    public void setString(int index) {
-      resetIndices();
-      this.stringIndex = index;
-    }
-
-    public void setDecimal(int index) {
-      resetIndices();
-      this.decimalIndex = index;
-    }
-
-    public void setTimestamp(int index) {
-      resetIndices();
-      this.timestampIndex= index;
-    }
-
-    public void setIntervalDayTime(int index) {
-      resetIndices();
-      this.intervalDayTimeIndex= index;
-    }
-  }
-
-  /**
-   * Lookup vector to map from key index to primitive type index.
-   */
-  protected KeyLookupHelper[] indexLookup;
+  final protected int keyCount;
+  private int addKeyIndex;
 
-  private int keyCount;
-  private int addIndex;
+  private int addLongIndex;
+  private int addDoubleIndex;
+  private int addStringIndex;
+  private int addDecimalIndex;
+  private int addTimestampIndex;
+  private int addIntervalDayTimeIndex;
 
-  protected int longIndicesIndex;
-  protected int doubleIndicesIndex;
-  protected int stringIndicesIndex;
-  protected int decimalIndicesIndex;
-  protected int timestampIndicesIndex;
-  protected int intervalDayTimeIndicesIndex;
+  // Given the keyIndex these arrays return:
+  //   The ColumnVector.Type,
+  //   The type specific index into longIndices, doubleIndices, etc...
+  protected ColumnVector.Type[] columnVectorTypes;
+  protected int[] columnTypeSpecificIndices;
 
   protected VectorColumnSetInfo(int keyCount) {
     this.keyCount = keyCount;
-    this.addIndex = 0;
+    this.addKeyIndex = 0;
 
     // We'll over allocate and then shrink the array for each type
     longIndices = new int[this.keyCount];
-    longIndicesIndex = 0;
+    addLongIndex = 0;
     doubleIndices = new int[this.keyCount];
-    doubleIndicesIndex  = 0;
+    addDoubleIndex  = 0;
     stringIndices = new int[this.keyCount];
-    stringIndicesIndex = 0;
+    addStringIndex = 0;
     decimalIndices = new int[this.keyCount];
-    decimalIndicesIndex = 0;
+    addDecimalIndex = 0;
     timestampIndices = new int[this.keyCount];
-    timestampIndicesIndex = 0;
+    addTimestampIndex = 0;
     intervalDayTimeIndices = new int[this.keyCount];
-    intervalDayTimeIndicesIndex = 0;
-    indexLookup = new KeyLookupHelper[this.keyCount];
-  }
+    addIntervalDayTimeIndex = 0;
 
-  protected void addKey(String outputType) throws HiveException {
-    indexLookup[addIndex] = new KeyLookupHelper();
+    columnVectorTypes = new ColumnVector.Type[this.keyCount];
+    columnTypeSpecificIndices = new int[this.keyCount];
+  }
 
-    String typeName = VectorizationContext.mapTypeNameSynonyms(outputType);
 
-    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
-    Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+  protected void addKey(ColumnVector.Type columnVectorType) throws HiveException {
 
     switch (columnVectorType) {
     case LONG:
-      longIndices[longIndicesIndex] = addIndex;
-      indexLookup[addIndex].setLong(longIndicesIndex);
-      ++longIndicesIndex;
+      longIndices[addLongIndex] = addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addLongIndex++;
       break;
-
     case DOUBLE:
-      doubleIndices[doubleIndicesIndex] = addIndex;
-      indexLookup[addIndex].setDouble(doubleIndicesIndex);
-      ++doubleIndicesIndex;
+      doubleIndices[addDoubleIndex] = addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addDoubleIndex++;
       break;
-
     case BYTES:
-      stringIndices[stringIndicesIndex]= addIndex;
-      indexLookup[addIndex].setString(stringIndicesIndex);
-      ++stringIndicesIndex;
+      stringIndices[addStringIndex]= addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addStringIndex++;
       break;
-
     case DECIMAL:
-      decimalIndices[decimalIndicesIndex]= addIndex;
-      indexLookup[addIndex].setDecimal(decimalIndicesIndex);
-      ++decimalIndicesIndex;
-      break;
-
+      decimalIndices[addDecimalIndex]= addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addDecimalIndex++;
+        break;
     case TIMESTAMP:
-      timestampIndices[timestampIndicesIndex] = addIndex;
-      indexLookup[addIndex].setTimestamp(timestampIndicesIndex);
-      ++timestampIndicesIndex;
+      timestampIndices[addTimestampIndex] = addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addTimestampIndex++;
       break;
-
     case INTERVAL_DAY_TIME:
-      intervalDayTimeIndices[intervalDayTimeIndicesIndex] = addIndex;
-      indexLookup[addIndex].setIntervalDayTime(intervalDayTimeIndicesIndex);
-      ++intervalDayTimeIndicesIndex;
+      intervalDayTimeIndices[addIntervalDayTimeIndex] = addKeyIndex;
+      columnTypeSpecificIndices[addKeyIndex] = addIntervalDayTimeIndex++;
       break;
-
     default:
       throw new HiveException("Unexpected column vector type " + columnVectorType);
     }
 
-    addIndex++;
+    columnVectorTypes[addKeyIndex] = columnVectorType;
+    addKeyIndex++;
   }
 
-  protected void finishAdding() {
-    longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
-    doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
-    stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
-    decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
-    timestampIndices = Arrays.copyOf(timestampIndices, timestampIndicesIndex);
-    intervalDayTimeIndices = Arrays.copyOf(intervalDayTimeIndices, intervalDayTimeIndicesIndex);
+
+  protected void finishAdding() throws HiveException {
+    longIndices = Arrays.copyOf(longIndices, addLongIndex);
+    doubleIndices = Arrays.copyOf(doubleIndices, addDoubleIndex);
+    stringIndices = Arrays.copyOf(stringIndices, addStringIndex);
+    decimalIndices = Arrays.copyOf(decimalIndices, addDecimalIndex);
+    timestampIndices = Arrays.copyOf(timestampIndices, addTimestampIndex);
+    intervalDayTimeIndices = Arrays.copyOf(intervalDayTimeIndices, addIntervalDayTimeIndex);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index 94eaf56..defaf90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -150,6 +150,25 @@ public class VectorExtractRow {
   }
 
   /*
+   * Initialize using an ObjectInspector array and a column projection array.
+   */
+  public void init(TypeInfo[] typeInfos, int[] projectedColumns)
+      throws HiveException {
+
+    final int count = typeInfos.length;
+    allocateArrays(count);
+
+    for (int i = 0; i < count; i++) {
+
+      int projectionColumnNum = projectedColumns[i];
+
+      TypeInfo typeInfo = typeInfos[i];
+
+      initEntry(i, projectionColumnNum, typeInfo);
+    }
+  }
+
+  /*
    * Initialize using data type names.
    * No projection -- the column range 0 .. types.size()-1
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index fef7c2a..30916a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -22,16 +22,21 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -52,6 +57,8 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javolution.util.FastBitSet;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
@@ -110,6 +117,24 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
 
   private transient VectorAssignRow vectorAssignRow;
 
+  /*
+   * Grouping sets members.
+   */
+  private transient boolean groupingSetsPresent;
+
+  // The field bits (i.e. which fields to include) or "id" for each grouping set.
+  private transient int[] groupingSets;
+
+  // The position in the column keys of the dummy grouping set id column.
+  private transient int groupingSetsPosition;
+
+  // The planner puts a constant field in for the dummy grouping set id.  We will overwrite it
+  // as we process the grouping sets.
+  private transient ConstantVectorExpression groupingSetsDummyVectorExpression;
+
+  // We translate the grouping set bit field into a boolean arrays.
+  private transient boolean[][] allGroupingSetsOverrideIsNulls;
+
   private transient int numEntriesHashTable;
 
   private transient long maxHashTblMemory;
@@ -144,6 +169,32 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
       // Do nothing.
     }
 
+    protected abstract void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
+        boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException;
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+
+      if (!groupingSetsPresent) {
+        doProcessBatch(batch, false, null);
+        return;
+      }
+
+      // We drive the doProcessBatch logic with the same batch but different
+      // grouping set id and null variation.
+      // PERFORMANCE NOTE: We do not try to reuse columns and generate the KeyWrappers anew...
+
+      final int size = groupingSets.length;
+      for (int i = 0; i < size; i++) {
+
+        // NOTE: We are overwriting the constant vector value...
+        groupingSetsDummyVectorExpression.setLongValue(groupingSets[i]);
+        groupingSetsDummyVectorExpression.evaluate(batch);
+
+        doProcessBatch(batch, (i == 0), allGroupingSetsOverrideIsNulls[i]);
+      }
+    }
+
     /**
      * Evaluates the aggregators on the current batch.
      * The aggregationBatchInfo must have been prepared
@@ -207,7 +258,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     }
 
     @Override
-    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+    public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
+        boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
       for (int i = 0; i < aggregators.length; ++i) {
         aggregators[i].aggregateInput(aggregationBuffers.getAggregationBuffer(i), batch);
       }
@@ -234,7 +286,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     /**
      * Total per hashtable entry fixed memory (does not depend on key/agg values).
      */
-    private int fixedHashEntrySize;
+    private long fixedHashEntrySize;
 
     /**
      * Average per hashtable entry variable size memory (depends on key/agg value).
@@ -322,17 +374,32 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
             HiveConf.ConfVars.HIVEGROUPBYMAPINTERVAL.defaultIntVal;
       }
 
+      sumBatchSize = 0;
+
       mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
       computeMemoryLimits();
       LOG.debug("using hash aggregation processing mode");
     }
 
     @Override
-    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+    public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
+        boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
+
+      if (!groupingSetsPresent || isFirstGroupingSet) {
+
+        // Evaluate the key expressions once.
+        for(int i = 0; i < keyExpressions.length; ++i) {
+          keyExpressions[i].evaluate(batch);
+        }
+      }
 
       // First we traverse the batch to evaluate and prepare the KeyWrappers
       // After this the KeyWrappers are properly set and hash code is computed
-      keyWrappersBatch.evaluateBatch(batch);
+      if (!groupingSetsPresent) {
+        keyWrappersBatch.evaluateBatch(batch);
+      } else {
+        keyWrappersBatch.evaluateBatchGroupingSets(batch, currentGroupingSetsOverrideIsNulls);
+      }
 
       // Next we locate the aggregation buffer set for each key
       prepareBatchAggregationBufferSets(batch);
@@ -391,10 +458,18 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
       // to bump its internal version.
       aggregationBatchInfo.startBatch();
 
+      if (batch.size == 0) {
+        return;
+      }
+
       // We now have to probe the global hash and find-or-allocate
       // the aggregation buffers to use for each key present in the batch
       VectorHashKeyWrapper[] keyWrappers = keyWrappersBatch.getVectorHashKeyWrappers();
-      for (int i=0; i < batch.size; ++i) {
+
+      final int n = keyExpressions.length == 0 ? 1 : batch.size;
+      // note - the row mapping is not relevant when aggregationBatchInfo::getDistinctBufferSetCount() == 1
+
+      for (int i=0; i < n; ++i) {
         VectorHashKeyWrapper kw = keyWrappers[i];
         VectorAggregationBufferRow aggregationBuffer = mapKeysAggregationBuffers.get(kw);
         if (null == aggregationBuffer) {
@@ -607,10 +682,24 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     }
 
     @Override
-    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+    public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
+        boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
+
+      if (!groupingSetsPresent || isFirstGroupingSet) {
+
+        // Evaluate the key expressions once.
+        for(int i = 0; i < keyExpressions.length; ++i) {
+          keyExpressions[i].evaluate(batch);
+        }
+      }
+
       // First we traverse the batch to evaluate and prepare the KeyWrappers
       // After this the KeyWrappers are properly set and hash code is computed
-      keyWrappersBatch.evaluateBatch(batch);
+      if (!groupingSetsPresent) {
+        keyWrappersBatch.evaluateBatch(batch);
+      } else {
+        keyWrappersBatch.evaluateBatchGroupingSets(batch, currentGroupingSetsOverrideIsNulls);
+      }
 
       VectorHashKeyWrapper[] batchKeys = keyWrappersBatch.getVectorHashKeyWrappers();
 
@@ -702,7 +791,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     @Override
     public void initialize(Configuration hconf) throws HiveException {
       inGroup = false;
-      groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length);
+
+      // We do not include the dummy grouping set column in the output.  So we pass outputKeyLength
+      // instead of keyExpressions.length
+      groupKeyHelper = new VectorGroupKeyHelper(outputKeyLength);
       groupKeyHelper.init(keyExpressions);
       groupAggregators = allocateAggregationBuffer();
       buffer = new DataOutputBuffer();
@@ -725,11 +817,18 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     }
 
     @Override
-    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+    public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
+        boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
       assert(inGroup);
       if (first) {
         // Copy the group key to output batch now.  We'll copy in the aggregates at the end of the group.
         first = false;
+
+        // Evaluate the key expressions of just this first batch to get the correct key.
+        for (int i = 0; i < outputKeyLength; i++) {
+          keyExpressions[i].evaluate(batch);
+        }
+
         groupKeyHelper.copyGroupKey(batch, outputBatch, buffer);
       }
 
@@ -778,6 +877,49 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     super(ctx);
   }
 
+  private void setupGroupingSets() {
+
+    groupingSetsPresent = conf.isGroupingSetsPresent();
+    if (!groupingSetsPresent) {
+      groupingSets = null;
+      groupingSetsPosition = -1;
+      groupingSetsDummyVectorExpression = null;
+      allGroupingSetsOverrideIsNulls = null;
+      return;
+    }
+
+    groupingSets = ArrayUtils.toPrimitive(conf.getListGroupingSets().toArray(new Integer[0]));
+    groupingSetsPosition = conf.getGroupingSetPosition();
+
+    allGroupingSetsOverrideIsNulls = new boolean[groupingSets.length][];
+
+    int pos = 0;
+    for (int groupingSet: groupingSets) {
+
+      // Create the mapping corresponding to the grouping set
+
+      // Assume all columns are null, except the dummy column is always non-null.
+      boolean[] groupingSetsOverrideIsNull = new boolean[keyExpressions.length];
+      Arrays.fill(groupingSetsOverrideIsNull, true);
+      groupingSetsOverrideIsNull[groupingSetsPosition] = false;
+
+      // Add keys of this grouping set.
+      FastBitSet bitset = GroupByOperator.groupingSet2BitSet(groupingSet, groupingSetsPosition);
+      for (int keyPos = bitset.nextClearBit(0); keyPos < groupingSetsPosition;
+        keyPos = bitset.nextClearBit(keyPos+1)) {
+        groupingSetsOverrideIsNull[keyPos] = false;
+      }
+
+      allGroupingSetsOverrideIsNulls[pos] =  groupingSetsOverrideIsNull;
+      pos++;
+    }
+
+    // The last key column is the dummy grouping set id.
+    //
+    // Figure out which (scratch) column was used so we can overwrite the dummy id.
+
+    groupingSetsDummyVectorExpression = (ConstantVectorExpression) keyExpressions[groupingSetsPosition];
+  }
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
@@ -831,15 +973,19 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
 
     forwardCache = new Object[outputKeyLength + aggregators.length];
 
+    setupGroupingSets();
+
     switch (vectorDesc.getProcessingMode()) {
     case GLOBAL:
       Preconditions.checkState(outputKeyLength == 0);
+      Preconditions.checkState(!groupingSetsPresent);
       processingMode = this.new ProcessingModeGlobalAggregate();
       break;
     case HASH:
       processingMode = this.new ProcessingModeHashAggregate();
       break;
     case MERGE_PARTIAL:
+      Preconditions.checkState(!groupingSetsPresent);
       processingMode = this.new ProcessingModeReduceMergePartial();
       break;
     case STREAMING:

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
index 50d0452..64706ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
@@ -19,8 +19,12 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
 
 /**
@@ -28,14 +32,25 @@ import org.apache.hadoop.io.DataOutputBuffer;
  */
 public class VectorGroupKeyHelper extends VectorColumnSetInfo {
 
+  private int[] outputColumnNums;
+
   public VectorGroupKeyHelper(int keyCount) {
     super(keyCount);
    }
 
   void init(VectorExpression[] keyExpressions) throws HiveException {
-    // Inspect the output type of each key expression.
-    for(int i=0; i < keyExpressions.length; ++i) {
-      addKey(keyExpressions[i].getOutputType());
+
+    // NOTE: To support pruning the grouping set id dummy key by VectorGroupbyOpeator MERGE_PARTIAL
+    // case, we use the keyCount passed to the constructor and not keyExpressions.length.
+
+    // Inspect the output type of each key expression.  And, remember the output columns.
+    outputColumnNums = new int[keyCount];
+    for(int i=0; i < keyCount; ++i) {
+      String typeName = VectorizationContext.mapTypeNameSynonyms(keyExpressions[i].getOutputType());
+      TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+      Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
+      addKey(columnVectorType);
+      outputColumnNums[i] = keyExpressions[i].getOutputColumn();
     }
     finishAdding();
   }
@@ -50,9 +65,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
   public void copyGroupKey(VectorizedRowBatch inputBatch, VectorizedRowBatch outputBatch,
           DataOutputBuffer buffer) throws HiveException {
     for(int i = 0; i< longIndices.length; ++i) {
-      int keyIndex = longIndices[i];
-      LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[keyIndex];
-      LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[longIndices[i]];
+      LongColumnVector inputColumnVector = (LongColumnVector) inputBatch.cols[columnIndex];
+      LongColumnVector outputColumnVector = (LongColumnVector) outputBatch.cols[columnIndex];
 
       // This vectorized code pattern says: 
       //    If the input batch has no nulls at all (noNulls is true) OR
@@ -76,9 +91,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
       }
     }
     for(int i=0;i<doubleIndices.length; ++i) {
-      int keyIndex = doubleIndices[i];
-      DoubleColumnVector inputColumnVector = (DoubleColumnVector) inputBatch.cols[keyIndex];
-      DoubleColumnVector outputColumnVector = (DoubleColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[doubleIndices[i]];
+      DoubleColumnVector inputColumnVector = (DoubleColumnVector) inputBatch.cols[columnIndex];
+      DoubleColumnVector outputColumnVector = (DoubleColumnVector) outputBatch.cols[columnIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
         outputColumnVector.vector[outputBatch.size] = inputColumnVector.vector[0];
       } else {
@@ -87,9 +102,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
       }
     }
     for(int i=0;i<stringIndices.length; ++i) {
-      int keyIndex = stringIndices[i];
-      BytesColumnVector inputColumnVector = (BytesColumnVector) inputBatch.cols[keyIndex];
-      BytesColumnVector outputColumnVector = (BytesColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[stringIndices[i]];
+      BytesColumnVector inputColumnVector = (BytesColumnVector) inputBatch.cols[columnIndex];
+      BytesColumnVector outputColumnVector = (BytesColumnVector) outputBatch.cols[columnIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
         // Copy bytes into scratch buffer.
         int start = buffer.getLength();
@@ -106,9 +121,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
       }
     }
     for(int i=0;i<decimalIndices.length; ++i) {
-      int keyIndex = decimalIndices[i];
-      DecimalColumnVector inputColumnVector = (DecimalColumnVector) inputBatch.cols[keyIndex];
-      DecimalColumnVector outputColumnVector = (DecimalColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[decimalIndices[i]];
+      DecimalColumnVector inputColumnVector = (DecimalColumnVector) inputBatch.cols[columnIndex];
+      DecimalColumnVector outputColumnVector = (DecimalColumnVector) outputBatch.cols[columnIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
 
         // Since we store references to HiveDecimalWritable instances, we must use the update method instead
@@ -120,9 +135,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
       }
     }
     for(int i=0;i<timestampIndices.length; ++i) {
-      int keyIndex = timestampIndices[i];
-      TimestampColumnVector inputColumnVector = (TimestampColumnVector) inputBatch.cols[keyIndex];
-      TimestampColumnVector outputColumnVector = (TimestampColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[timestampIndices[i]];
+      TimestampColumnVector inputColumnVector = (TimestampColumnVector) inputBatch.cols[columnIndex];
+      TimestampColumnVector outputColumnVector = (TimestampColumnVector) outputBatch.cols[columnIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
 
         outputColumnVector.setElement(outputBatch.size, 0, inputColumnVector);
@@ -132,9 +147,9 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
       }
     }
     for(int i=0;i<intervalDayTimeIndices.length; ++i) {
-      int keyIndex = intervalDayTimeIndices[i];
-      IntervalDayTimeColumnVector inputColumnVector = (IntervalDayTimeColumnVector) inputBatch.cols[keyIndex];
-      IntervalDayTimeColumnVector outputColumnVector = (IntervalDayTimeColumnVector) outputBatch.cols[keyIndex];
+      final int columnIndex = outputColumnNums[intervalDayTimeIndices[i]];
+      IntervalDayTimeColumnVector inputColumnVector = (IntervalDayTimeColumnVector) inputBatch.cols[columnIndex];
+      IntervalDayTimeColumnVector outputColumnVector = (IntervalDayTimeColumnVector) outputBatch.cols[columnIndex];
       if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
 
         outputColumnVector.setElement(outputBatch.size, 0, inputColumnVector);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
index 5de59b1..3e1fcdd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import org.apache.hive.common.util.Murmur3;
+
 import java.sql.Timestamp;
 import java.util.Arrays;
 
@@ -30,6 +32,8 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A hash map key wrapper for vectorized processing.
  * It stores the key values as primitives in arrays for each supported primitive type.
@@ -39,6 +43,17 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  */
 public class VectorHashKeyWrapper extends KeyWrapper {
 
+  public static final class HashContext {
+    private final Murmur3.IncrementalHash32 bytesHash = new Murmur3.IncrementalHash32();
+
+    public static Murmur3.IncrementalHash32 getBytesHash(HashContext ctx) {
+      if (ctx == null) {
+        return new Murmur3.IncrementalHash32();
+      }
+      return ctx.bytesHash;
+    }
+  }
+
   private static final int[] EMPTY_INT_ARRAY = new int[0];
   private static final long[] EMPTY_LONG_ARRAY = new long[0];
   private static final double[] EMPTY_DOUBLE_ARRAY = new double[0];
@@ -59,15 +74,25 @@ public class VectorHashKeyWrapper extends KeyWrapper {
   private HiveDecimalWritable[] decimalValues;
 
   private Timestamp[] timestampValues;
+  private static Timestamp ZERO_TIMESTAMP = new Timestamp(0);
 
   private HiveIntervalDayTime[] intervalDayTimeValues;
+  private static HiveIntervalDayTime ZERO_INTERVALDAYTIME= new HiveIntervalDayTime(0, 0);
 
+  // NOTE: The null array is indexed by keyIndex, which is not available internally.  The mapping
+  //       from a long, double, etc index to key index is kept once in the separate
+  //       VectorColumnSetInfo object.
   private boolean[] isNull;
+
   private int hashcode;
 
-  private VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount,
+  private HashContext hashCtx;
+
+  private VectorHashKeyWrapper(HashContext ctx, int longValuesCount, int doubleValuesCount,
           int byteValuesCount, int decimalValuesCount, int timestampValuesCount,
-          int intervalDayTimeValuesCount) {
+          int intervalDayTimeValuesCount,
+          int keyCount) {
+    hashCtx = ctx;
     longValues = longValuesCount > 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY;
     doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY;
     decimalValues = decimalValuesCount > 0 ? new HiveDecimalWritable[decimalValuesCount] : EMPTY_DECIMAL_ARRAY;
@@ -91,23 +116,23 @@ public class VectorHashKeyWrapper extends KeyWrapper {
     for(int i = 0; i < intervalDayTimeValuesCount; ++i) {
       intervalDayTimeValues[i] = new HiveIntervalDayTime();
     }
-    isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount +
-                         decimalValuesCount + timestampValuesCount + intervalDayTimeValuesCount];
+    isNull = new boolean[keyCount];
     hashcode = 0;
   }
 
   private VectorHashKeyWrapper() {
   }
 
-  public static VectorHashKeyWrapper allocate(int longValuesCount, int doubleValuesCount,
+  public static VectorHashKeyWrapper allocate(HashContext ctx, int longValuesCount, int doubleValuesCount,
       int byteValuesCount, int decimalValuesCount, int timestampValuesCount,
-      int intervalDayTimeValuesCount) {
+      int intervalDayTimeValuesCount, int keyCount) {
     if ((longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount
         + timestampValuesCount + intervalDayTimeValuesCount) == 0) {
       return EMPTY_KEY_WRAPPER;
     }
-    return new VectorHashKeyWrapper(longValuesCount, doubleValuesCount, byteValuesCount,
-        decimalValuesCount, timestampValuesCount, intervalDayTimeValuesCount);
+    return new VectorHashKeyWrapper(ctx, longValuesCount, doubleValuesCount, byteValuesCount,
+        decimalValuesCount, timestampValuesCount, intervalDayTimeValuesCount,
+        keyCount);
   }
 
   @Override
@@ -117,45 +142,44 @@ public class VectorHashKeyWrapper extends KeyWrapper {
 
   @Override
   public void setHashKey() {
-    hashcode = Arrays.hashCode(longValues) ^
+    // compute locally and assign
+    int hash = Arrays.hashCode(longValues) ^
         Arrays.hashCode(doubleValues) ^
         Arrays.hashCode(isNull);
 
     for (int i = 0; i < decimalValues.length; i++) {
       // Use the new faster hash code since we are hashing memory objects.
-      hashcode ^= decimalValues[i].newFasterHashCode();
+      hash ^= decimalValues[i].newFasterHashCode();
     }
 
     for (int i = 0; i < timestampValues.length; i++) {
-      hashcode ^= timestampValues[i].hashCode();
+      hash ^= timestampValues[i].hashCode();
     }
 
     for (int i = 0; i < intervalDayTimeValues.length; i++) {
-      hashcode ^= intervalDayTimeValues[i].hashCode();
+      hash ^= intervalDayTimeValues[i].hashCode();
     }
 
     // This code, with branches and all, is not executed if there are no string keys
+    Murmur3.IncrementalHash32 bytesHash = null;
     for (int i = 0; i < byteValues.length; ++i) {
       /*
        *  Hashing the string is potentially expensive so is better to branch.
        *  Additionally not looking at values for nulls allows us not reset the values.
        */
-      if (!isNull[longValues.length + doubleValues.length + i]) {
-        byte[] bytes = byteValues[i];
-        int start = byteStarts[i];
-        int length = byteLengths[i];
-        if (length == bytes.length && start == 0) {
-          hashcode ^= Arrays.hashCode(bytes);
-        }
-        else {
-          // Unfortunately there is no Arrays.hashCode(byte[], start, length)
-          for(int j = start; j < start + length; ++j) {
-            // use 461 as is a (sexy!) prime.
-            hashcode ^= 461 * bytes[j];
-          }
-        }
+      if (byteLengths[i] == -1) {
+        continue;
+      }
+      if (bytesHash == null) {
+        bytesHash = HashContext.getBytesHash(hashCtx);
+        bytesHash.start(hash);
       }
+      bytesHash.add(byteValues[i], byteStarts[i], byteLengths[i]);
     }
+    if (bytesHash != null) {
+      hash = bytesHash.end();
+    }
+    this.hashcode = hash;
   }
 
   @Override
@@ -167,6 +191,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
   public boolean equals(Object that) {
     if (that instanceof VectorHashKeyWrapper) {
       VectorHashKeyWrapper keyThat = (VectorHashKeyWrapper)that;
+      // not comparing hashCtx - irrelevant
       return hashcode == keyThat.hashcode &&
           Arrays.equals(longValues, keyThat.longValues) &&
           Arrays.equals(doubleValues, keyThat.doubleValues) &&
@@ -184,7 +209,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
     //By the time we enter here the byteValues.lentgh and isNull must have already been compared
     for (int i = 0; i < byteValues.length; ++i) {
       // the byte comparison is potentially expensive so is better to branch on null
-      if (!isNull[longValues.length + doubleValues.length + i]) {
+      if (byteLengths[i] != -1) {
         if (!StringExpr.equal(
             byteValues[i],
             byteStarts[i],
@@ -207,6 +232,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
   }
 
   public void duplicateTo(VectorHashKeyWrapper clone) {
+    clone.hashCtx = hashCtx;
     clone.longValues = (longValues.length > 0) ? longValues.clone() : EMPTY_LONG_ARRAY;
     clone.doubleValues = (doubleValues.length > 0) ? doubleValues.clone() : EMPTY_DOUBLE_ARRAY;
     clone.isNull = isNull.clone();
@@ -228,7 +254,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
       for (int i = 0; i < byteValues.length; ++i) {
         // avoid allocation/copy of nulls, because it potentially expensive.
         // branch instead.
-        if (!isNull[longValues.length + doubleValues.length + i]) {
+        if (byteLengths[i] != -1) {
           clone.byteValues[i] = Arrays.copyOfRange(byteValues[i],
               byteStarts[i], byteStarts[i] + byteLengths[i]);
         }
@@ -274,106 +300,141 @@ public class VectorHashKeyWrapper extends KeyWrapper {
     throw new UnsupportedOperationException();
   }
 
-  public void assignDouble(int index, double d) {
-    doubleValues[index] = d;
-    isNull[longValues.length + index] = false;
+  public void assignLong(int index, long v) {
+    longValues[index] = v;
   }
 
-  public void assignNullDouble(int index) {
-    doubleValues[index] = 0; // assign 0 to simplify hashcode
-    isNull[longValues.length + index] = true;
+  public void assignNullLong(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    longValues[index] = 0; // assign 0 to simplify hashcode
   }
 
-  public void assignLong(int index, long v) {
-    longValues[index] = v;
-    isNull[index] = false;
+  public void assignDouble(int index, double d) {
+    doubleValues[index] = d;
   }
 
-  public void assignNullLong(int index) {
-    longValues[index] = 0; // assign 0 to simplify hashcode
-    isNull[index] = true;
+  public void assignNullDouble(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    doubleValues[index] = 0; // assign 0 to simplify hashcode
   }
 
   public void assignString(int index, byte[] bytes, int start, int length) {
+    Preconditions.checkState(bytes != null);
     byteValues[index] = bytes;
     byteStarts[index] = start;
     byteLengths[index] = length;
-    isNull[longValues.length + doubleValues.length + index] = false;
   }
 
-  public void assignNullString(int index) {
-    // We do not assign the value to byteValues[] because the value is never used on null
-    isNull[longValues.length + doubleValues.length + index] = true;
+  public void assignNullString(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    byteValues[index] = null;
+    byteStarts[index] = 0;
+    // We need some value that indicates NULL.
+    byteLengths[index] = -1;
   }
 
   public void assignDecimal(int index, HiveDecimalWritable value) {
     decimalValues[index].set(value);
-    isNull[longValues.length + doubleValues.length + byteValues.length + index] = false;
   }
 
-  public void assignNullDecimal(int index) {
-      isNull[longValues.length + doubleValues.length + byteValues.length + index] = true;
+  public void assignNullDecimal(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    decimalValues[index].set(HiveDecimal.ZERO); // assign 0 to simplify hashcode
   }
 
   public void assignTimestamp(int index, Timestamp value) {
     timestampValues[index] = value;
-    isNull[longValues.length + doubleValues.length + byteValues.length +
-           decimalValues.length + index] = false;
   }
 
   public void assignTimestamp(int index, TimestampColumnVector colVector, int elementNum) {
     colVector.timestampUpdate(timestampValues[index], elementNum);
-    isNull[longValues.length + doubleValues.length + byteValues.length +
-           decimalValues.length + index] = false;
   }
 
-  public void assignNullTimestamp(int index) {
-      isNull[longValues.length + doubleValues.length + byteValues.length +
-             decimalValues.length + index] = true;
+  public void assignNullTimestamp(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    timestampValues[index] = ZERO_TIMESTAMP; // assign 0 to simplify hashcode
   }
 
   public void assignIntervalDayTime(int index, HiveIntervalDayTime value) {
     intervalDayTimeValues[index].set(value);
-    isNull[longValues.length + doubleValues.length + byteValues.length +
-           decimalValues.length + timestampValues.length + index] = false;
   }
 
   public void assignIntervalDayTime(int index, IntervalDayTimeColumnVector colVector, int elementNum) {
     intervalDayTimeValues[index].set(colVector.asScratchIntervalDayTime(elementNum));
-    isNull[longValues.length + doubleValues.length + byteValues.length +
-           decimalValues.length + timestampValues.length + index] = false;
   }
 
-  public void assignNullIntervalDayTime(int index) {
-      isNull[longValues.length + doubleValues.length + byteValues.length +
-             decimalValues.length + timestampValues.length + index] = true;
+  public void assignNullIntervalDayTime(int keyIndex, int index) {
+    isNull[keyIndex] = true;
+    intervalDayTimeValues[index] = ZERO_INTERVALDAYTIME; // assign 0 to simplify hashcode
   }
 
   @Override
   public String toString()
   {
-    return String.format("%d[%s] %d[%s] %d[%s] %d[%s] %d[%s] %d[%s]",
-        longValues.length, Arrays.toString(longValues),
-        doubleValues.length, Arrays.toString(doubleValues),
-        byteValues.length, Arrays.toString(byteValues),
-        decimalValues.length, Arrays.toString(decimalValues),
-        timestampValues.length, Arrays.toString(timestampValues),
-        intervalDayTimeValues.length, Arrays.toString(intervalDayTimeValues));
-  }
-
-  public boolean getIsLongNull(int i) {
-    return isNull[i];
-  }
+    StringBuilder sb = new StringBuilder();
+    boolean isFirst = true;
+    if (longValues.length > 0) {
+      isFirst = false;
+      sb.append("longs ");
+      sb.append(Arrays.toString(longValues));
+    }
+    if (doubleValues.length > 0) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append("doubles ");
+      sb.append(Arrays.toString(doubleValues));
+    }
+    if (byteValues.length > 0) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append("byte lengths ");
+      sb.append(Arrays.toString(byteLengths));
+    }
+    if (decimalValues.length > 0) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append("decimals ");
+      sb.append(Arrays.toString(decimalValues));
+    }
+    if (timestampValues.length > 0) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append("timestamps ");
+      sb.append(Arrays.toString(timestampValues));
+    }
+    if (intervalDayTimeValues.length > 0) {
+      if (isFirst) {
+        isFirst = false;
+      } else {
+        sb.append(", ");
+      }
+      sb.append("interval day times ");
+      sb.append(Arrays.toString(intervalDayTimeValues));
+    }
 
-  public boolean getIsDoubleNull(int i) {
-    return isNull[longValues.length + i];
-  }
+    if (isFirst) {
+      isFirst = false;
+    } else {
+      sb.append(", ");
+    }
+    sb.append("nulls ");
+    sb.append(Arrays.toString(isNull));
 
-  public boolean getIsBytesNull(int i) {
-    return isNull[longValues.length + doubleValues.length + i];
+    return sb.toString();
   }
 
-
   public long getLongValue(int i) {
     return longValues[i];
   }
@@ -403,35 +464,29 @@ public class VectorHashKeyWrapper extends KeyWrapper {
     return variableSize;
   }
 
-  public boolean getIsDecimalNull(int i) {
-    return isNull[longValues.length + doubleValues.length + byteValues.length + i];
-  }
-
   public HiveDecimalWritable getDecimal(int i) {
     return decimalValues[i];
   }
 
-  public boolean getIsTimestampNull(int i) {
-    return isNull[longValues.length + doubleValues.length + byteValues.length +
-                  decimalValues.length + i];
-  }
-
   public Timestamp getTimestamp(int i) {
     return timestampValues[i];
   }
 
-  public boolean getIsIntervalDayTimeNull(int i) {
-    return isNull[longValues.length + doubleValues.length + byteValues.length +
-                  decimalValues.length + timestampValues.length + i];
-  }
-
   public HiveIntervalDayTime getIntervalDayTime(int i) {
     return intervalDayTimeValues[i];
   }
 
+  public void clearIsNull() {
+    Arrays.fill(isNull, false);
+  }
+
+  public boolean isNull(int keyIndex) {
+    return isNull[keyIndex];
+  }
+
   public static final class EmptyVectorHashKeyWrapper extends VectorHashKeyWrapper {
     private EmptyVectorHashKeyWrapper() {
-      super(0, 0, 0, 0, 0, 0);
+      super(null, 0, 0, 0, 0, 0, 0, /* keyCount */ 0);
       // no need to override assigns - all assign ops will fail due to 0 size
     }
 
@@ -451,4 +506,3 @@ public class VectorHashKeyWrapper extends KeyWrapper {
     }
   }
 }
-