You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/01 05:32:52 UTC

[07/10] hive git commit: HIVE-12290 Native Vector ReduceSink (Matt McCline, reviewed by Gopal V)

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
new file mode 100644
index 0000000..bf0a25b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/keyseries/VectorKeySeriesSingleImpl.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.keyseries;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of when a one key series or a serialized key series is being presented.
+ *
+ */
+public abstract class VectorKeySeriesSingleImpl extends VectorKeySeriesImpl
+    implements VectorKeySeries {
+
+  private static final Log LOG = LogFactory.getLog(VectorKeySeriesSingleImpl.class.getName());
+
+  protected int currentBatchSize;
+
+  // The number of keys (with sequential duplicates collapsed, both NULL and non-NULL) in the batch.
+  protected int seriesCount;
+
+  // The current position in the key series.
+  protected int seriesPosition;
+
+  // The number of duplicates for each series key (NULL or non-NULL).
+  protected final int[] duplicateCounts;
+
+  // Whether a series key is NULL.
+  protected final boolean[] seriesIsAllNull;
+
+  // The number of non-NULL keys.  They have associated hash codes and key data. 
+  protected int nonNullKeyCount;
+
+  // The current non-NULL key position.
+  protected int nonNullKeyPosition;
+
+  // The hash code for each non-NULL key.
+  protected final int[] hashCodes;
+
+  VectorKeySeriesSingleImpl() {
+    super();
+
+    seriesCount = 0;
+    seriesPosition = 0;
+
+    duplicateCounts = new int[VectorizedRowBatch.DEFAULT_SIZE];
+    seriesIsAllNull = new boolean[VectorizedRowBatch.DEFAULT_SIZE];
+
+    nonNullKeyCount = 0;
+    nonNullKeyPosition = -1;
+
+    hashCodes = new int[VectorizedRowBatch.DEFAULT_SIZE];
+  }
+
+  public boolean validate() {
+    Preconditions.checkState(seriesCount > 0);
+    Preconditions.checkState(seriesCount <= currentBatchSize);
+    Preconditions.checkState(nonNullKeyCount >= 0);
+    Preconditions.checkState(nonNullKeyCount <= seriesCount);
+
+    validateDuplicateCount();
+    return true;
+  }
+
+  private void validateDuplicateCount() {
+    int sum = 0;
+    int duplicateCount;
+    for (int i = 0; i < seriesCount; i++) {
+      duplicateCount = duplicateCounts[i];
+      Preconditions.checkState(duplicateCount > 0);
+      Preconditions.checkState(duplicateCount <= currentBatchSize);
+      sum += duplicateCount;
+    }
+    Preconditions.checkState(sum == currentBatchSize);
+  }
+
+  @Override
+  public void positionToFirst() {
+    seriesPosition = 0;
+
+    currentLogical = 0;
+    currentDuplicateCount = duplicateCounts[0];
+    currentIsAllNull = seriesIsAllNull[0];
+
+    if (!currentIsAllNull) {
+      nonNullKeyPosition = 0;
+      currentHashCode = hashCodes[0];
+      setNextNonNullKey(0);
+    } else {
+      nonNullKeyPosition = -1;
+    }
+    Preconditions.checkState(currentDuplicateCount > 0);
+  }
+
+  // Consumes whole key.
+  @Override
+  public boolean next() {
+
+    currentLogical += currentDuplicateCount;
+    if (currentLogical >= currentBatchSize) {
+      return false;
+    }
+
+    Preconditions.checkState(seriesPosition + 1 < seriesCount);
+
+    seriesPosition++;
+    currentDuplicateCount = duplicateCounts[seriesPosition];
+    currentIsAllNull = seriesIsAllNull[seriesPosition];
+
+    if (!currentIsAllNull) {
+      Preconditions.checkState(nonNullKeyPosition + 1 < nonNullKeyCount);
+      nonNullKeyPosition++;
+      currentHashCode = hashCodes[nonNullKeyPosition];
+      setNextNonNullKey(nonNullKeyPosition);
+    }
+    Preconditions.checkState(currentDuplicateCount > 0);
+    return true;
+  }
+
+  // For use by VectorKeySeriesMulti so that the minimum equal key can be advanced.
+  public void advance(int duplicateCount) {
+
+    currentLogical += currentDuplicateCount;
+
+    currentDuplicateCount -= duplicateCount;
+    if (currentDuplicateCount == 0) {
+      seriesPosition++;
+      currentIsAllNull = seriesIsAllNull[seriesPosition];
+      currentDuplicateCount = duplicateCounts[seriesPosition];
+
+      if (!currentIsAllNull) {
+        nonNullKeyPosition++;
+        currentHashCode = hashCodes[nonNullKeyPosition];
+        setNextNonNullKey(nonNullKeyPosition);
+      }
+    }
+  }
+
+  protected abstract void setNextNonNullKey(int nonNullKeyPosition);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index afea926..435b438 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -168,7 +168,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
 
   // This helper object deserializes LazyBinary format small table values into columns of a row
   // in a vectorized row batch.
-  protected transient VectorDeserializeRow smallTableVectorDeserializeRow;
+  protected transient VectorDeserializeRow<LazyBinaryDeserializeRead> smallTableVectorDeserializeRow;
 
   // This a 2nd batch with the same "column schema" as the big table batch that can be used to
   // build join output results in.  If we can create some join output results in the big table
@@ -573,10 +573,11 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
      * Create our vectorized copy row and deserialize row helper objects.
      */
     if (smallTableMapping.getCount() > 0) {
-      smallTableVectorDeserializeRow = new VectorDeserializeRow(
-                      new LazyBinaryDeserializeRead(
-                          VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
-                              smallTableMapping.getTypeNames())));
+      smallTableVectorDeserializeRow =
+          new VectorDeserializeRow<LazyBinaryDeserializeRead>(
+              new LazyBinaryDeserializeRead(
+                  VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
+                      smallTableMapping.getTypeNames())));
       smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index 260f4e1..4e2bd7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
 import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 
 /**
@@ -73,7 +74,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinGenerateResultOperator.class.getName());
   private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName();
 
-  private transient PrimitiveTypeInfo[] bigTablePrimitiveTypeInfos;
+  //------------------------------------------------------------------------------------------------
+
+  private transient TypeInfo[] bigTableTypeInfos;
 
   private transient VectorSerializeRow bigTableVectorSerializeRow;
 
@@ -417,7 +420,7 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
     List<Integer> projectedColumns = vContext.getProjectedColumns();
     int projectionSize = vContext.getProjectedColumns().size();
 
-    List<PrimitiveTypeInfo> typeInfoList = new ArrayList<PrimitiveTypeInfo>();
+    List<TypeInfo> typeInfoList = new ArrayList<TypeInfo>();
     List<Integer> noNullsProjectionList = new ArrayList<Integer>();
     for (int i = 0; i < projectionSize; i++) {
       int projectedColumn = projectedColumns.get(i);
@@ -429,17 +432,19 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
 
     int[] noNullsProjection = ArrayUtils.toPrimitive(noNullsProjectionList.toArray(new Integer[0]));
     int noNullsProjectionSize = noNullsProjection.length;
-    bigTablePrimitiveTypeInfos = typeInfoList.toArray(new PrimitiveTypeInfo[0]);
+    bigTableTypeInfos = typeInfoList.toArray(new TypeInfo[0]);
 
     bigTableVectorSerializeRow =
-            new VectorSerializeRow(new LazyBinarySerializeWrite(noNullsProjectionSize));
+            new VectorSerializeRow<LazyBinarySerializeWrite>(
+                new LazyBinarySerializeWrite(noNullsProjectionSize));
 
     bigTableVectorSerializeRow.init(
-                bigTablePrimitiveTypeInfos,
-                noNullsProjectionList);
+                bigTableTypeInfos,
+                noNullsProjection);
 
-    bigTableVectorDeserializeRow = new VectorDeserializeRow(
-            new LazyBinaryDeserializeRead(bigTablePrimitiveTypeInfos));
+    bigTableVectorDeserializeRow =
+        new VectorDeserializeRow<LazyBinaryDeserializeRead>(
+            new LazyBinaryDeserializeRead(bigTableTypeInfos));
 
     bigTableVectorDeserializeRow.init(noNullsProjection);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
index a2559f8..02a3746 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
 
 // Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
 
@@ -65,7 +65,7 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
 
   // Object that can take a set of columns in row in a vectorized row batch and serialized it.
   // Known to not have any nulls.
-  private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+  private transient VectorSerializeRow keyVectorSerializeWrite;
 
   // The BinarySortable serialization of the current key.
   private transient Output currentKeyOutput;
@@ -105,9 +105,9 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
          * Initialize Multi-Key members for this specialized class.
          */
 
-        keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+        keyVectorSerializeWrite = new VectorSerializeRow(
                                         new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
-        keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+        keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
 
         currentKeyOutput = new Output();
         saveKeyOutput = new Output();
@@ -194,8 +194,12 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
          * Multi-Key specific repeated lookup.
          */
 
-        keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-        keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+        keyVectorSerializeWrite.setOutput(currentKeyOutput);
+        keyVectorSerializeWrite.serializeWrite(batch, 0);
+        if (keyVectorSerializeWrite.getHasAnyNulls()) {
+          // Not expecting NULLs in MapJoin -- they should have been filtered out.
+          throw new HiveException("Null key not expected in MapJoin");
+        }
         byte[] keyBytes = currentKeyOutput.getData();
         int keyLength = currentKeyOutput.getLength();
         JoinUtil.JoinResult joinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[0]);
@@ -248,8 +252,12 @@ public class VectorMapJoinInnerBigOnlyMultiKeyOperator extends VectorMapJoinInne
            */
 
           // Generate binary sortable key for current row in vectorized row batch.
-          keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-          keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+          keyVectorSerializeWrite.setOutput(currentKeyOutput);
+          keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+          if (keyVectorSerializeWrite.getHasAnyNulls()) {
+            // Not expecting NULLs in MapJoin -- they should have been filtered out.
+            throw new HiveException("Null key not expected in MapJoin");
+          }
 
           /*
            * Equal key series checking.

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
index 7e58c75..6b63200 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerMultiKeyOperator.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
 
 // Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
 
@@ -63,7 +63,7 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
 
   // Object that can take a set of columns in row in a vectorized row batch and serialized it.
   // Known to not have any nulls.
-  private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+  private transient VectorSerializeRow keyVectorSerializeWrite;
 
   // The BinarySortable serialization of the current key.
   private transient Output currentKeyOutput;
@@ -103,9 +103,9 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
          * Initialize Multi-Key members for this specialized class.
          */
 
-        keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+        keyVectorSerializeWrite = new VectorSerializeRow(
                                         new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
-        keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+        keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
 
         currentKeyOutput = new Output();
         saveKeyOutput = new Output();
@@ -191,8 +191,12 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
          * Multi-Key specific repeated lookup.
          */
 
-        keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-        keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+        keyVectorSerializeWrite.setOutput(currentKeyOutput);
+        keyVectorSerializeWrite.serializeWrite(batch, 0);
+        if (keyVectorSerializeWrite.getHasAnyNulls()) {
+          // Not expecting NULLs in MapJoin -- they should have been filtered out.
+          throw new HiveException("Null key not expected in MapJoin");
+        }
         byte[] keyBytes = currentKeyOutput.getData();
         int keyLength = currentKeyOutput.getLength();
         JoinUtil.JoinResult joinResult = hashMap.lookup(keyBytes, 0, keyLength, hashMapResults[0]);
@@ -245,8 +249,12 @@ public class VectorMapJoinInnerMultiKeyOperator extends VectorMapJoinInnerGenera
            */
 
           // Generate binary sortable key for current row in vectorized row batch.
-          keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-          keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+          keyVectorSerializeWrite.setOutput(currentKeyOutput);
+          keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+          if (keyVectorSerializeWrite.getHasAnyNulls()) {
+            // Not expecting NULLs in MapJoin -- they should have been filtered out.
+            throw new HiveException("Null key not expected in MapJoin");
+          }
 
           /*
            * Equal key series checking.

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
index 43e6fa7..f03bf6f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiMultiKeyOperator.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
 
 // Multi-Key specific imports.
-import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRowNoNulls;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
 
@@ -64,7 +64,7 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
 
   // Object that can take a set of columns in row in a vectorized row batch and serialized it.
   // Known to not have any nulls.
-  private transient VectorSerializeRowNoNulls keyVectorSerializeWriteNoNulls;
+  private transient VectorSerializeRow keyVectorSerializeWrite;
 
   // The BinarySortable serialization of the current key.
   private transient Output currentKeyOutput;
@@ -104,9 +104,9 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
          * Initialize Multi-Key members for this specialized class.
          */
 
-        keyVectorSerializeWriteNoNulls = new VectorSerializeRowNoNulls(
+        keyVectorSerializeWrite = new VectorSerializeRow(
                                         new BinarySortableSerializeWrite(bigTableKeyColumnMap.length));
-        keyVectorSerializeWriteNoNulls.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
+        keyVectorSerializeWrite.init(bigTableKeyTypeNames, bigTableKeyColumnMap);
 
         currentKeyOutput = new Output();
         saveKeyOutput = new Output();
@@ -193,8 +193,12 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
          * Multi-Key specific repeated lookup.
          */
 
-        keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-        keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, 0);
+        keyVectorSerializeWrite.setOutput(currentKeyOutput);
+        keyVectorSerializeWrite.serializeWrite(batch, 0);
+        if (keyVectorSerializeWrite.getHasAnyNulls()) {
+          // Not expecting NULLs in MapJoin -- they should have been filtered out.
+          throw new HiveException("Null key not expected in MapJoin");
+        }
         byte[] keyBytes = currentKeyOutput.getData();
         int keyLength = currentKeyOutput.getLength();
         // LOG.debug(CLASS_NAME + " processOp all " + displayBytes(keyBytes, 0, keyLength));
@@ -247,8 +251,12 @@ public class VectorMapJoinLeftSemiMultiKeyOperator extends VectorMapJoinLeftSemi
            */
 
           // Generate binary sortable key for current row in vectorized row batch.
-          keyVectorSerializeWriteNoNulls.setOutput(currentKeyOutput);
-          keyVectorSerializeWriteNoNulls.serializeWriteNoNulls(batch, batchIndex);
+          keyVectorSerializeWrite.setOutput(currentKeyOutput);
+          keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+          if (keyVectorSerializeWrite.getHasAnyNulls()) {
+            // Not expecting NULLs in MapJoin -- they should have been filtered out.
+            throw new HiveException("Null key not expected in MapJoin");
+          }
 
           // LOG.debug(CLASS_NAME + " currentKey " +
           //      VectorizedBatchUtil.displayBytes(currentKeyOutput.getData(), 0, currentKeyOutput.getLength()));

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
index 49e0e85..2c98a24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterMultiKeyOperator.java
@@ -294,9 +294,8 @@ public class VectorMapJoinOuterMultiKeyOperator extends VectorMapJoinOuterGenera
 
           // Generate binary sortable key for current row in vectorized row batch.
           keyVectorSerializeWrite.setOutput(currentKeyOutput);
-          boolean isNull = keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
-
-          if (isNull) {
+          keyVectorSerializeWrite.serializeWrite(batch, batchIndex);
+          if (keyVectorSerializeWrite.getHasAnyNulls()) {
 
             // Have that the NULL does not interfere with the current equal key series, if there
             // is one. We do not set saveJoinResult.

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 36ee768..0ff98bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single byte array value hash map optimized for vector map join.
@@ -71,7 +72,7 @@ public abstract class VectorMapJoinFastBytesHashMap
 
     optimizedHashMapResult.forget();
 
-    long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
     long valueRefWord = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
     JoinUtil.JoinResult joinResult;
     if (valueRefWord == -1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index fc04504..5d8ed2d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single byte array value hash multi-set optimized for vector map join.
@@ -67,7 +68,7 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
 
     optimizedHashMultiSetResult.forget();
 
-    long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
     long count = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
     JoinUtil.JoinResult joinResult;
     if (count == -1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index bac10df..990a2e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.JoinUtil;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashSet;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashSetResult;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single byte array value hash multi-set optimized for vector map join.
@@ -62,7 +63,7 @@ public abstract class VectorMapJoinFastBytesHashSet
 
     optimizedHashSetResult.forget();
 
-    long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
     long existance = findReadSlot(keyBytes, keyStart, keyLength, hashCode);
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index c06482b..b978bf0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -70,7 +71,7 @@ public abstract class VectorMapJoinFastBytesHashTable
       expandAndRehash();
     }
 
-    long hashCode = VectorMapJoinFastBytesHashUtil.hashKey(keyBytes, keyStart, keyLength);
+    long hashCode = HashCodeUtil.murmurHash(keyBytes, keyStart, keyLength);
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
     long probeSlot = slot;

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
index 28f7357..80126ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashUtil.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 public class VectorMapJoinFastBytesHashUtil {
 
-  public static long hashKey(byte[] bytes, int start, int length) {
-    return WriteBuffers.murmurHash(bytes, start, length);
-  }
-
   public static String displayBytes(byte[] bytes, int start, int length) {
     StringBuilder sb = new StringBuilder();
     for (int i = start; i < start + length; i++) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
deleted file mode 100644
index a818cb2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastIntHashUtil.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
-
-public class VectorMapJoinFastIntHashUtil {
-
-  public static int hashKey(int key) {
-    key = ~key + (key << 15); // key = (key << 15) - key - 1;
-    key = key ^ (key >>> 12);
-    key = key + (key << 2);
-    key = key ^ (key >>> 4);
-    key = key * 2057; // key = (key + (key << 3)) + (key << 11);
-    key = key ^ (key >>> 16);
-    return key;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 149f1d0..1384fc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHash
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single long value map optimized for vector map join.
@@ -67,7 +68,7 @@ public class VectorMapJoinFastLongHashMap
 
     optimizedHashMapResult.forget();
 
-    long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
     // LOG.debug("VectorMapJoinFastLongHashMap lookup " + key + " hashCode " + hashCode);
     long valueRef = findReadSlot(key, hashCode);
     JoinUtil.JoinResult joinResult;

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 87c17e7..94bf706 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single long value multi-set optimized for vector map join.
@@ -67,7 +68,7 @@ public class VectorMapJoinFastLongHashMultiSet
 
     optimizedHashMultiSetResult.forget();
 
-    long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
     long count = findReadSlot(key, hashCode);
     JoinUtil.JoinResult joinResult;
     if (count == -1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d5aa99c..2cbc548 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHash
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashSet;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 
 /*
  * An single long value multi-set optimized for vector map join.
@@ -60,7 +61,7 @@ public class VectorMapJoinFastLongHashSet
 
     optimizedHashSetResult.forget();
 
-    long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
     long existance = findReadSlot(key, hashCode);
     JoinUtil.JoinResult joinResult;
     if (existance == -1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 5b48fcf..7ea3455 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeseriali
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.common.util.HashCodeUtil;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -111,7 +112,7 @@ public abstract class VectorMapJoinFastLongHashTable
       expandAndRehash();
     }
 
-    long hashCode = VectorMapJoinFastLongHashUtil.hashKey(key);
+    long hashCode = HashCodeUtil.calculateLongHashCode(key);
     int intHashCode = (int) hashCode;
     int slot = (intHashCode & logicalHashBucketMask);
     long probeSlot = slot;
@@ -179,7 +180,7 @@ public abstract class VectorMapJoinFastLongHashTable
         long tableKey = slotPairs[pairIndex + 1];
 
         // Copy to new slot table.
-        long hashCode = VectorMapJoinFastLongHashUtil.hashKey(tableKey);
+        long hashCode = HashCodeUtil.calculateLongHashCode(tableKey);
         int intHashCode = (int) hashCode;
         int newSlot = intHashCode & newLogicalHashBucketMask;
         long newProbeSlot = newSlot;

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
index 298ca61..1877f14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashUtil.java
@@ -25,17 +25,6 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableDeseriali
 
 public class VectorMapJoinFastLongHashUtil {
 
-  public static long hashKey(long key) {
-    key = (~key) + (key << 21); // key = (key << 21) - key - 1;
-    key = key ^ (key >>> 24);
-    key = (key + (key << 3)) + (key << 8); // key * 265
-    key = key ^ (key >>> 14);
-    key = (key + (key << 2)) + (key << 4); // key * 21
-    key = key ^ (key >>> 28);
-    key = key + (key << 31);
-    return key;
-  }
-
   public static long deserializeLongKey(BinarySortableDeserializeRead keyBinarySortableDeserializeRead,
       HashTableKeyType hashTableKeyType) throws IOException {
     long key = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
new file mode 100644
index 0000000..a79a649
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/**
+ * This class is common operator class for native vectorized reduce sink.
+ */
+public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<ReduceSinkDesc>
+    implements VectorizationContextRegion {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkCommonOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected VectorReduceSinkDesc vectorDesc;
+
+  /**
+   * Information about our native vectorized reduce sink created by the Vectorizer class during
+   * it decision process and useful for execution.
+   */
+  protected VectorReduceSinkInfo vectorReduceSinkInfo;
+
+  protected VectorizationContext vContext;
+
+  /**
+   * Reduce sink key vector expressions.
+   */
+
+  // This is map of which vectorized row batch columns are the key columns.
+  // And, their types.
+  protected int[] reduceSinkKeyColumnMap;
+  protected TypeInfo[] reduceSinkKeyTypeInfos;
+
+  // Optional vectorized key expressions that need to be run on each batch.
+  protected VectorExpression[] reduceSinkKeyExpressions;
+
+  // This is map of which vectorized row batch columns are the value columns.
+  // And, their types.
+  protected int[] reduceSinkValueColumnMap;
+  protected TypeInfo[] reduceSinkValueTypeInfos;
+
+  // Optional vectorized value expressions that need to be run on each batch.
+  protected VectorExpression[] reduceSinkValueExpressions;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  // Whether there is to be a tag added to the end of each key and the tag value.
+  private transient boolean reduceSkipTag;
+  private transient byte reduceTagByte;
+
+  // Binary sortable key serializer.
+  protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite;
+
+  // The serialized all null key and its hash code.
+  private transient byte[] nullBytes;
+  private transient int nullKeyHashCode;
+
+  // Lazy binary value serializer.
+  private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+
+  // This helper object serializes LazyBinary format reducer values from columns of a row
+  // in a vectorized row batch.
+  private transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+
+  // The output buffer used to serialize a value into.
+  private transient Output valueOutput;
+
+  // The hive key and bytes writable value needed to pass the key and value to the collector.
+  private transient HiveKey keyWritable;
+  private transient BytesWritable valueBytesWritable;
+
+  // Where to write our key and value pairs.
+  private transient OutputCollector out;
+
+  // The object that determines equal key series.
+  protected transient VectorKeySeriesSerialized serializedKeySeries;
+
+  private transient long numRows = 0;
+  private transient long cntr = 1;
+  private transient long logEveryNRows = 0;
+  private final transient LongWritable recordCounter = new LongWritable();
+
+  // For debug tracing: the name of the map or reduce task.
+  protected transient String taskName;
+
+  // Debug display.
+  protected transient long batchCounter;
+
+  //---------------------------------------------------------------------------
+
+  public VectorReduceSinkCommonOperator() {
+    super();
+  }
+
+  public VectorReduceSinkCommonOperator(VectorizationContext vContext, OperatorDesc conf)
+          throws HiveException {
+    super();
+
+    ReduceSinkDesc desc = (ReduceSinkDesc) conf;
+    this.conf = desc;
+    vectorDesc = desc.getVectorDesc();
+    vectorReduceSinkInfo = vectorDesc.getVectorReduceSinkInfo();
+    this.vContext = vContext;
+
+    // Since a key expression can be a calculation and the key will go into a scratch column,
+    // we need the mapping and type information.
+    reduceSinkKeyColumnMap = vectorReduceSinkInfo.getReduceSinkKeyColumnMap();
+    reduceSinkKeyTypeInfos = vectorReduceSinkInfo.getReduceSinkKeyTypeInfos();
+    reduceSinkKeyExpressions = vectorReduceSinkInfo.getReduceSinkKeyExpressions();
+
+    reduceSinkValueColumnMap = vectorReduceSinkInfo.getReduceSinkValueColumnMap();
+    reduceSinkValueTypeInfos = vectorReduceSinkInfo.getReduceSinkValueTypeInfos();
+    reduceSinkValueExpressions = vectorReduceSinkInfo.getReduceSinkValueExpressions();
+  }
+
+  // Get the sort order
+  private boolean[] getColumnSortOrder(Properties properties, int columnCount) {
+    String columnSortOrder = properties.getProperty(serdeConstants.SERIALIZATION_SORT_ORDER);
+    boolean[] columnSortOrderIsDesc = new boolean[columnCount];
+    if (columnSortOrder == null) {
+      Arrays.fill(columnSortOrderIsDesc, false);
+    } else {
+      for (int i = 0; i < columnSortOrderIsDesc.length; i++) {
+        columnSortOrderIsDesc[i] = (columnSortOrder.charAt(i) == '-');
+      }
+    }
+    return columnSortOrderIsDesc;
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    if (LOG.isDebugEnabled()) {
+      // Determine the name of our map or reduce task for debug tracing.
+      BaseWork work = Utilities.getMapWork(hconf);
+      if (work == null) {
+        work = Utilities.getReduceWork(hconf);
+      }
+      taskName = work.getName();
+    }
+
+    String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    if (context != null && !context.isEmpty()) {
+      context = "_" + context.replace(" ","_");
+    }
+    statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
+    reduceSkipTag = conf.getSkipTag();
+    reduceTagByte = (byte) conf.getTag();
+
+    if (isLogInfoEnabled) {
+      LOG.info("Using tag = " + (int) reduceTagByte);
+    }
+
+    TableDesc keyTableDesc = conf.getKeySerializeInfo();
+    boolean[] columnSortOrder =
+        getColumnSortOrder(keyTableDesc.getProperties(), reduceSinkKeyColumnMap.length);
+
+    keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder);
+
+    // Create all nulls key.
+    try {
+      Output nullKeyOutput = new Output();
+      keyBinarySortableSerializeWrite.set(nullKeyOutput);
+      for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+        keyBinarySortableSerializeWrite.writeNull();
+      }
+      int nullBytesLength = nullKeyOutput.getLength();
+      nullBytes = new byte[nullBytesLength];
+      System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
+      nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+
+    valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length);
+
+    valueVectorSerializeRow =
+        new VectorSerializeRow<LazyBinarySerializeWrite>(
+            valueLazyBinarySerializeWrite);
+    valueVectorSerializeRow.init(reduceSinkValueTypeInfos, reduceSinkValueColumnMap);
+
+    valueOutput = new Output();
+    valueVectorSerializeRow.setOutput(valueOutput);
+
+    keyWritable = new HiveKey();
+
+    valueBytesWritable = new BytesWritable();
+
+    batchCounter = 0;
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+
+    try {
+      VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+      batchCounter++;
+
+      if (batch.size == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+        }
+        return;
+      }
+
+      // Perform any key expressions.  Results will go into scratch columns.
+      if (reduceSinkKeyExpressions != null) {
+        for (VectorExpression ve : reduceSinkKeyExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+
+      // Perform any value expressions.  Results will go into scratch columns.
+      if (reduceSinkValueExpressions != null) {
+        for (VectorExpression ve : reduceSinkValueExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+
+      serializedKeySeries.processBatch(batch);
+
+      boolean selectedInUse = batch.selectedInUse;
+      int[] selected = batch.selected;
+
+      int keyLength;
+      int logical;
+      int end;
+      int batchIndex;
+      do {
+        if (serializedKeySeries.getCurrentIsAllNull()) {
+
+          // Use the same logic as ReduceSinkOperator.toHiveKey.
+          //
+          if (tag == -1 || reduceSkipTag) {
+            keyWritable.set(nullBytes, 0, nullBytes.length);
+          } else {
+            keyWritable.setSize(nullBytes.length + 1);
+            System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
+            keyWritable.get()[nullBytes.length] = reduceTagByte;
+          }
+          keyWritable.setDistKeyLength(nullBytes.length);
+          keyWritable.setHashCode(nullKeyHashCode);
+
+        } else {
+
+          // One serialized key for 1 or more rows for the duplicate keys.
+          // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
+          // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
+          keyLength = serializedKeySeries.getSerializedLength();
+          if (tag == -1 || reduceSkipTag) {
+            keyWritable.set(serializedKeySeries.getSerializedBytes(),
+                serializedKeySeries.getSerializedStart(), keyLength);
+          } else {
+            keyWritable.setSize(keyLength + 1);
+            System.arraycopy(serializedKeySeries.getSerializedBytes(),
+                serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
+            keyWritable.get()[keyLength] = reduceTagByte;
+          }
+          keyWritable.setDistKeyLength(keyLength);
+          keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
+        }
+
+        logical = serializedKeySeries.getCurrentLogical();
+        end = logical + serializedKeySeries.getCurrentDuplicateCount();
+        do {
+          batchIndex = (selectedInUse ? selected[logical] : logical);
+
+          valueLazyBinarySerializeWrite.reset();
+          valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+          valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+
+          collect(keyWritable, valueBytesWritable);
+        } while (++logical < end);
+  
+        if (!serializedKeySeries.next()) {
+          break;
+        }
+      } while (true);
+
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
+    // Since this is a terminal operator, update counters explicitly -
+    // forward is not called
+    if (null != out) {
+      numRows++;
+      if (isLogInfoEnabled) {
+        if (numRows == cntr) {
+          cntr = logEveryNRows == 0 ? cntr * 10 : numRows + logEveryNRows;
+          if (cntr < 0 || numRows < 0) {
+            cntr = 0;
+            numRows = 1;
+          }
+          LOG.info(toString() + ": records written - " + numRows);
+        }
+      }
+
+      // BytesWritable valueBytesWritable = (BytesWritable) valueWritable;
+      // LOG.info("VectorReduceSinkCommonOperator collect keyWritable " + keyWritable.getLength() + " " +
+      //     VectorizedBatchUtil.displayBytes(keyWritable.getBytes(), 0, keyWritable.getLength()) +
+      //     " valueWritable " + valueBytesWritable.getLength() +
+      //     VectorizedBatchUtil.displayBytes(valueBytesWritable.getBytes(), 0, valueBytesWritable.getLength()));
+
+      out.collect(keyWritable, valueWritable);
+    }
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    super.closeOp(abort);
+    out = null;
+    if (isLogInfoEnabled) {
+      LOG.info(toString() + ": records written - " + numRows);
+    }
+    recordCounter.set(numRows);
+  }
+
+  /**
+   * @return the name of the operator
+   */
+  @Override
+  public String getName() {
+    return getOperatorName();
+  }
+
+  static public String getOperatorName() {
+    return "RS";
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.REDUCESINK;
+  }
+
+  @Override
+  public VectorizationContext getOuputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
+  public boolean getIsReduceSink() {
+    return true;
+  }
+
+  @Override
+  public String getReduceOutputName() {
+    return conf.getOutputName();
+  }
+
+  @Override
+  public void setOutputCollector(OutputCollector _out) {
+    this.out = _out;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
new file mode 100644
index 0000000..cec5660
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesLongSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
+ */
+public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  // The column number and type information for this one column long reduce key.
+  private transient int singleKeyColumn;
+  private transient PrimitiveTypeInfo singleKeyColumnPrimitiveTypeInfo;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  //---------------------------------------------------------------------------
+  // Pass-thru constructors.
+  //
+
+  public VectorReduceSinkLongOperator() {
+    super();
+  }
+
+  public VectorReduceSinkLongOperator(VectorizationContext vContext, OperatorDesc conf)
+          throws HiveException {
+    super(vContext, conf);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    singleKeyColumn = reduceSinkKeyColumnMap[0];
+    singleKeyColumnPrimitiveTypeInfo = (PrimitiveTypeInfo) reduceSinkKeyTypeInfos[0];
+
+    serializedKeySeries =
+        new VectorKeySeriesLongSerialized<BinarySortableSerializeWrite>(
+            singleKeyColumn, singleKeyColumnPrimitiveTypeInfo, keyBinarySortableSerializeWrite);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
new file mode 100644
index 0000000..a4ef66b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesMultiSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on multiple key columns
+ * (or a single non-long / non-string column).
+ */
+public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  //---------------------------------------------------------------------------
+  // Pass-thru constructors.
+  //
+
+  public VectorReduceSinkMultiKeyOperator() {
+    super();
+  }
+
+  public VectorReduceSinkMultiKeyOperator(VectorizationContext vContext, OperatorDesc conf)
+          throws HiveException {
+    super(vContext, conf);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    VectorKeySeriesMultiSerialized<BinarySortableSerializeWrite> serializedMultiKeySeries =
+        new VectorKeySeriesMultiSerialized<BinarySortableSerializeWrite>(
+            keyBinarySortableSerializeWrite);
+    serializedMultiKeySeries.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap);
+
+    serializedKeySeries = serializedMultiKeySeries;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
new file mode 100644
index 0000000..b6cb527
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesBytesSerialized;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/*
+ * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
+ */
+public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  // The column number and type information for this one column string reduce key.
+  private transient int singleKeyColumn;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  //---------------------------------------------------------------------------
+  // Pass-thru constructors.
+  //
+
+  public VectorReduceSinkStringOperator() {
+    super();
+  }
+
+  public VectorReduceSinkStringOperator(VectorizationContext vContext, OperatorDesc conf)
+          throws HiveException {
+    super(vContext, conf);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    singleKeyColumn = reduceSinkKeyColumnMap[0];
+
+    serializedKeySeries =
+        new VectorKeySeriesBytesSerialized<BinarySortableSerializeWrite>(
+            singleKeyColumn, keyBinarySortableSerializeWrite);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 4dead18..97e7013 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,7 +55,10 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator;
+import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
@@ -61,7 +66,10 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -89,10 +97,12 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
@@ -100,6 +110,8 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -140,8 +152,10 @@ import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
 import org.apache.hadoop.hive.ql.udf.UDFYear;
 import org.apache.hadoop.hive.ql.udf.generic.*;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -1716,6 +1730,189 @@ public class Vectorizer implements PhysicalPlanResolver {
     return specialize;
   }
 
+  private Operator<? extends OperatorDesc> specializeReduceSinkOperator(
+      Operator<? extends OperatorDesc> op, VectorizationContext vContext, ReduceSinkDesc desc,
+      VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException {
+
+    Operator<? extends OperatorDesc> vectorOp = null;
+    Class<? extends Operator<?>> opClass = null;
+
+    Type[] reduceSinkKeyColumnVectorTypes = vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes();
+
+    // By default, we can always use the multi-key class.
+    VectorReduceSinkDesc.ReduceSinkKeyType reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.MULTI_KEY;
+
+    // Look for single column optimization.
+    if (reduceSinkKeyColumnVectorTypes.length == 1) {
+      LOG.info("Vectorizer vectorizeOperator groupby typeName " + vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()[0]);
+      Type columnVectorType = reduceSinkKeyColumnVectorTypes[0];
+      switch (columnVectorType) {
+      case LONG:
+        {
+          PrimitiveCategory primitiveCategory =
+              ((PrimitiveTypeInfo) vectorReduceSinkInfo.getReduceSinkKeyTypeInfos()[0]).getPrimitiveCategory();
+          switch (primitiveCategory) {
+          case BOOLEAN:
+          case BYTE:
+          case SHORT:
+          case INT:
+          case LONG:
+            reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.LONG;
+            break;
+          default:
+            // Other integer types not supported yet.
+            break;
+          }
+        }
+        break;
+      case BYTES:
+        reduceSinkKeyType = VectorReduceSinkDesc.ReduceSinkKeyType.STRING;
+      default:
+        // Stay with multi-key.
+        break;
+      }
+    }
+
+    switch (reduceSinkKeyType) {
+    case LONG:
+      opClass = VectorReduceSinkLongOperator.class;
+      break;
+    case STRING:
+      opClass = VectorReduceSinkStringOperator.class;
+      break;
+    case MULTI_KEY:
+      opClass = VectorReduceSinkMultiKeyOperator.class;
+      break;
+    default:
+      throw new HiveException("Unknown reduce sink key type " + reduceSinkKeyType);
+    }
+
+    VectorReduceSinkDesc vectorDesc = new VectorReduceSinkDesc();
+    desc.setVectorDesc(vectorDesc);
+    vectorDesc.setReduceSinkKeyType(reduceSinkKeyType);
+    vectorDesc.setVectorReduceSinkInfo(vectorReduceSinkInfo);
+
+    vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext);
+    LOG.info("Vectorizer vectorizeOperator reduce sink class " + vectorOp.getClass().getSimpleName());
+
+    return vectorOp;
+  }
+
+  private boolean canSpecializeReduceSink(ReduceSinkDesc desc,
+      boolean isTez, VectorizationContext vContext,
+      VectorReduceSinkInfo vectorReduceSinkInfo) throws HiveException {
+
+    if (!HiveConf.getBoolVar(hiveConf,
+        HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCESINK_NEW_ENABLED)) {
+      return false;
+    }
+
+    // Many restrictions.
+
+    if (!isTez) {
+      return false;
+    }
+
+    if (desc.getWriteType() == AcidUtils.Operation.UPDATE ||
+        desc.getWriteType() == AcidUtils.Operation.DELETE) {
+      return false;
+    }
+
+    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
+      return false;
+    }
+
+    boolean useUniformHash = desc.getReducerTraits().contains(UNIFORM);
+    if (!useUniformHash) {
+      return false;
+    }
+
+    if (desc.getTopN() >= 0) {
+      return false;
+    }
+
+    if (desc.getDistinctColumnIndices().size() > 0) {
+      return false;
+    }
+
+    TableDesc keyTableDesc = desc.getKeySerializeInfo();
+    Class<? extends Deserializer> keySerializerClass = keyTableDesc.getDeserializerClass();
+    if (keySerializerClass != org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe.class) {
+      return false;
+    }
+ 
+    TableDesc valueTableDesc = desc.getValueSerializeInfo();
+    Class<? extends Deserializer> valueDeserializerClass = valueTableDesc.getDeserializerClass();
+    if (valueDeserializerClass != org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class) {
+      return false;
+    }
+
+    // We are doing work here we'd normally do in VectorGroupByCommonOperator's constructor.
+    // So if we later decide not to specialize, we'll just waste any scratch columns allocated...
+
+    List<ExprNodeDesc> keysDescs = desc.getKeyCols();
+    VectorExpression[] allKeyExpressions = vContext.getVectorExpressions(keysDescs);
+
+    // Since a key expression can be a calculation and the key will go into a scratch column,
+    // we need the mapping and type information.
+    int[] reduceSinkKeyColumnMap = new int[allKeyExpressions.length];
+    TypeInfo[] reduceSinkKeyTypeInfos = new TypeInfo[allKeyExpressions.length];
+    Type[] reduceSinkKeyColumnVectorTypes = new Type[allKeyExpressions.length];
+    ArrayList<VectorExpression> groupByKeyExpressionsList = new ArrayList<VectorExpression>();
+    VectorExpression[] reduceSinkKeyExpressions;
+    for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+      VectorExpression ve = allKeyExpressions[i];
+      reduceSinkKeyColumnMap[i] = ve.getOutputColumn();
+      reduceSinkKeyTypeInfos[i] = keysDescs.get(i).getTypeInfo();
+      reduceSinkKeyColumnVectorTypes[i] =
+          VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkKeyTypeInfos[i]);
+      if (!IdentityExpression.isColumnOnly(ve)) {
+        groupByKeyExpressionsList.add(ve);
+      }
+    }
+    if (groupByKeyExpressionsList.size() == 0) {
+      reduceSinkKeyExpressions = null;
+    } else {
+      reduceSinkKeyExpressions = groupByKeyExpressionsList.toArray(new VectorExpression[0]);
+    }
+
+    ArrayList<ExprNodeDesc> valueDescs = desc.getValueCols();
+    VectorExpression[] allValueExpressions = vContext.getVectorExpressions(valueDescs);
+
+    int[] reduceSinkValueColumnMap = new int[valueDescs.size()];
+    TypeInfo[] reduceSinkValueTypeInfos = new TypeInfo[valueDescs.size()];
+    Type[] reduceSinkValueColumnVectorTypes = new Type[valueDescs.size()];
+    ArrayList<VectorExpression> reduceSinkValueExpressionsList = new ArrayList<VectorExpression>();
+    VectorExpression[] reduceSinkValueExpressions;
+    for (int i = 0; i < valueDescs.size(); ++i) {
+      VectorExpression ve = allValueExpressions[i];
+      reduceSinkValueColumnMap[i] = ve.getOutputColumn();
+      reduceSinkValueTypeInfos[i] = valueDescs.get(i).getTypeInfo();
+      reduceSinkValueColumnVectorTypes[i] =
+          VectorizationContext.getColumnVectorTypeFromTypeInfo(reduceSinkValueTypeInfos[i]);
+      if (!IdentityExpression.isColumnOnly(ve)) {
+        reduceSinkValueExpressionsList.add(ve);
+      }
+    }
+    if (reduceSinkValueExpressionsList.size() == 0) {
+      reduceSinkValueExpressions = null;
+    } else {
+      reduceSinkValueExpressions = reduceSinkValueExpressionsList.toArray(new VectorExpression[0]);
+    }
+ 
+    vectorReduceSinkInfo.setReduceSinkKeyColumnMap(reduceSinkKeyColumnMap);
+    vectorReduceSinkInfo.setReduceSinkKeyTypeInfos(reduceSinkKeyTypeInfos);
+    vectorReduceSinkInfo.setReduceSinkKeyColumnVectorTypes(reduceSinkKeyColumnVectorTypes);
+    vectorReduceSinkInfo.setReduceSinkKeyExpressions(reduceSinkKeyExpressions);
+
+    vectorReduceSinkInfo.setReduceSinkValueColumnMap(reduceSinkValueColumnMap);
+    vectorReduceSinkInfo.setReduceSinkValueTypeInfos(reduceSinkValueTypeInfos);
+    vectorReduceSinkInfo.setReduceSinkValueColumnVectorTypes(reduceSinkValueColumnVectorTypes);
+    vectorReduceSinkInfo.setReduceSinkValueExpressions(reduceSinkValueExpressions);
+
+    return true;
+  }
+
   Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
       VectorizationContext vContext, boolean isTez) throws HiveException {
     Operator<? extends OperatorDesc> vectorOp = null;
@@ -1756,11 +1953,28 @@ public class Vectorizer implements PhysicalPlanResolver {
           }
         }
         break;
+      
+      case REDUCESINK:
+        {
+          VectorReduceSinkInfo vectorReduceSinkInfo = new VectorReduceSinkInfo();
+          ReduceSinkDesc desc = (ReduceSinkDesc) op.getConf();
+          boolean specialize = canSpecializeReduceSink(desc, isTez, vContext, vectorReduceSinkInfo);
+
+          if (!specialize) {
+
+            vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
+
+          } else {
+
+            vectorOp = specializeReduceSinkOperator(op, vContext, desc, vectorReduceSinkInfo);
+
+          }
+        }
+        break;
       case GROUPBY:
       case FILTER:
       case SELECT:
       case FILESINK:
-      case REDUCESINK:
       case LIMIT:
       case EXTRACT:
       case EVENT:

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 615739e..2f69b7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -120,6 +120,10 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
   private transient boolean hasOrderBy = false;
 
   private static transient Logger LOG = LoggerFactory.getLogger(ReduceSinkDesc.class);
+
+  // Extra parameters only for vectorization.
+  private VectorReduceSinkDesc vectorDesc;
+
   public ReduceSinkDesc() {
   }
 
@@ -146,6 +150,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     this.setNumBuckets(-1);
     this.setBucketCols(null);
     this.writeType = writeType;
+    this.vectorDesc = null;
   }
 
   @Override
@@ -175,9 +180,21 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     desc.reduceTraits = reduceTraits.clone();
     desc.setDeduplicated(isDeduplicated);
     desc.setHasOrderBy(hasOrderBy);
+    if (vectorDesc != null) {
+      throw new RuntimeException("Clone with vectorization desc not supported");
+    }
+    desc.vectorDesc = null;
     return desc;
   }
 
+  public void setVectorDesc(VectorReduceSinkDesc vectorDesc) {
+    this.vectorDesc = vectorDesc;
+  }
+
+  public VectorReduceSinkDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
   public java.util.ArrayList<java.lang.String> getOutputKeyColumnNames() {
     return outputKeyColumnNames;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/409db57d/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
new file mode 100644
index 0000000..c56bff6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorReduceSinkDesc.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+/**
+ * VectorReduceSinkDesc.
+ *
+ * Extra parameters beyond ReduceSinkDesc just for the VectorReduceSinkOperator.
+ *
+ * We don't extend ReduceSinkDesc because the base OperatorDesc doesn't support
+ * clone and adding it is a lot work for little gain.
+ */
+public class VectorReduceSinkDesc extends AbstractVectorDesc  {
+
+  private static long serialVersionUID = 1L;
+
+  public static enum ReduceSinkKeyType {
+    NONE,
+    LONG,
+    STRING,
+    MULTI_KEY
+  }
+
+  private ReduceSinkKeyType reduceSinkKeyType;
+
+  private VectorReduceSinkInfo vectorReduceSinkInfo;
+
+  public VectorReduceSinkDesc() {
+    reduceSinkKeyType = ReduceSinkKeyType.NONE;
+    vectorReduceSinkInfo = null;
+  }
+
+  public ReduceSinkKeyType reduceSinkKeyType() {
+    return reduceSinkKeyType;
+  }
+
+  public void setReduceSinkKeyType(ReduceSinkKeyType reduceSinkKeyType) {
+    this.reduceSinkKeyType = reduceSinkKeyType;
+  }
+
+  public void setVectorReduceSinkInfo(VectorReduceSinkInfo vectorReduceSinkInfo) {
+    this.vectorReduceSinkInfo = vectorReduceSinkInfo;
+  }
+
+  public VectorReduceSinkInfo getVectorReduceSinkInfo() {
+    return vectorReduceSinkInfo;
+  }
+}