You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:31:52 UTC

[05/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 49ecdd1..b015e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
@@ -68,6 +68,21 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
+    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    long numEntries = 0;
+    long noCondTaskSize = desc.getNoConditionalTaskSize();
+    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+    if (!doMemCheck) {
+      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+          "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+    } else {
+      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+        noCondTaskSize, inflationFactor);
+    }
+
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -93,15 +108,41 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
                 new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
 
+        LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
         while (kvReader.next()) {
           vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
               (BytesWritable)kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && numEntries >= memoryCheckInterval) {
+            if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+              final long threshold = (long) (inflationFactor * noCondTaskSize);
+              // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+              // available for container/executor
+              final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+              if (estMemUsage > effectiveThreshold) {
+                String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
+                  " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+                  " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+                  " effectiveThreshold: " + effectiveThreshold;
+                LOG.error(msg);
+                throw new MapJoinMemoryExhaustionError(msg);
+              } else {
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
+                    "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+                }
+              }
+            }
+          }
         }
 
         vectorMapJoinFastTableContainer.seal();
-        mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer;
-
+        mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+        LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
+          ". Small table position: " + pos);
       } catch (IOException e) {
         throw new HiveException(e);
       } catch (SerDeException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index be51693..3e9ff84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 // Optimized for sequential key lookup.
 
-public class VectorMapJoinFastKeyStore {
+public class VectorMapJoinFastKeyStore implements MemoryEstimate {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName());
 
@@ -165,4 +166,12 @@ public class VectorMapJoinFastKeyStore {
     this.writeBuffers = writeBuffers;
     unsafeReadPos = new WriteBuffers.Position();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+    size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize();
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 6fe98f9..d4847b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -37,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class VectorMapJoinFastLongHashMap
              extends VectorMapJoinFastLongHashTable
-             implements VectorMapJoinLongHashMap {
+             implements VectorMapJoinLongHashMap, MemoryEstimate {
 
   public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class);
 
@@ -112,4 +114,9 @@ public class VectorMapJoinFastLongHashMap
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     valueStore = new VectorMapJoinFastValueStore(writeBuffersSize);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 9140aee..566cfa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -100,4 +100,9 @@ public class VectorMapJoinFastLongHashMultiSet
     super(minMaxEnabled, isOuterJoin, hashTableKeyType,
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d3efb11..fb7ae62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -96,4 +96,9 @@ public class VectorMapJoinFastLongHashSet
     super(minMaxEnabled, isOuterJoin, hashTableKeyType,
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 8bfa07c..54e667c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -280,4 +281,18 @@ public abstract class VectorMapJoinFastLongHashTable
     min = Long.MAX_VALUE;
     max = Long.MIN_VALUE;
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = super.getEstimatedMemorySize();
+    size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length);
+    size += (2 * jdm.primitive2());
+    size += (2 * jdm.primitive1());
+    size += jdm.object();
+    // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    size += (16 * 1024L);
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
index add4788..eb08aa9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
@@ -53,4 +53,9 @@ public class VectorMapJoinFastMultiKeyHashMap
         int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
index faefdbb..56964bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
@@ -52,4 +52,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
index 5328910..46bafe0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
@@ -52,5 +52,8 @@ public class VectorMapJoinFastMultiKeyHashSet
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
 
-
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
index f13034f..d04590a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
@@ -43,4 +43,13 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    size += (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
index 53ad7b4..b24bfdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    long size = (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
index 723c729..75fae25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    long size = (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 05f1cf1..2fe4b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
-import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * HashTableLoader for Tez constructs the hashtable from records read from
@@ -46,7 +45,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  */
 public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName());
 
   private final MapJoinDesc desc;
   private final Configuration hconf;
@@ -219,6 +218,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   }
 
   @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += vectorMapJoinFastHashTable.getEstimatedMemorySize();
+    size += (4 * jdm.primitive1());
+    size += (2 * jdm.object());
+    size += (jdm.primitive2());
+    return size;
+  }
+
+  @Override
   public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx)
       throws SerDeException {
     // Do nothing in this case.

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
index f9c5b34..3cd06e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
@@ -30,7 +31,7 @@ import com.google.common.base.Preconditions;
 
 // Supports random access.
 
-public class VectorMapJoinFastValueStore {
+public class VectorMapJoinFastValueStore implements MemoryEstimate {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName());
 
@@ -113,6 +114,11 @@ public class VectorMapJoinFastValueStore {
     return writeBuffers;
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+  }
+
   public static class HashMapResult extends VectorMapJoinHashMapResult {
 
     private VectorMapJoinFastValueStore valueStore;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index c7e585c..9cc9ad4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
@@ -28,7 +29,7 @@ import org.apache.hadoop.io.BytesWritable;
  * Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or
  * hash set).
  */
-public interface VectorMapJoinHashTable {
+public interface VectorMapJoinHashTable extends MemoryEstimate {
 
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
index 93a89d7..1560807 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashSet.java
@@ -75,4 +75,9 @@ public class VectorMapJoinOptimizedHashSet
       MapJoinTableContainer originalTableContainer, ReusableGetAdaptor hashMapRowGetter) {
     super(originalTableContainer, hashMapRowGetter);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
index 5fe7861..5275e1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -96,4 +97,12 @@ public abstract class VectorMapJoinOptimizedHashTable implements VectorMapJoinHa
   public int size() {
     return originalTableContainer.size();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += originalTableContainer == null ? 0 : originalTableContainer.getEstimatedMemorySize();
+    size += (2 * JavaDataModel.get().object());
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
index f921b9c..4b46ce0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringHashSet.java
@@ -60,4 +60,12 @@ public class VectorMapJoinOptimizedStringHashSet
     super(originalTableContainer, hashMapRowGetter);
     stringCommon =  new VectorMapJoinOptimizedStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    long size = (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
index 42ca4b7..fc5aea5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java
@@ -102,36 +102,29 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
   //---------------------------------------------------------------------------
 
   // Whether there is to be a tag added to the end of each key and the tag value.
-  private transient boolean reduceSkipTag;
-  private transient byte reduceTagByte;
+  protected transient boolean reduceSkipTag;
+  protected transient byte reduceTagByte;
 
   // Binary sortable key serializer.
   protected transient BinarySortableSerializeWrite keyBinarySortableSerializeWrite;
 
-  // The serialized all null key and its hash code.
-  private transient byte[] nullBytes;
-  private transient int nullKeyHashCode;
-
   // Lazy binary value serializer.
-  private transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
+  protected transient LazyBinarySerializeWrite valueLazyBinarySerializeWrite;
 
   // This helper object serializes LazyBinary format reducer values from columns of a row
   // in a vectorized row batch.
-  private transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
+  protected transient VectorSerializeRow<LazyBinarySerializeWrite> valueVectorSerializeRow;
 
   // The output buffer used to serialize a value into.
-  private transient Output valueOutput;
+  protected transient Output valueOutput;
 
   // The hive key and bytes writable value needed to pass the key and value to the collector.
-  private transient HiveKey keyWritable;
-  private transient BytesWritable valueBytesWritable;
+  protected transient HiveKey keyWritable;
+  protected transient BytesWritable valueBytesWritable;
 
   // Where to write our key and value pairs.
   private transient OutputCollector out;
 
-  // The object that determines equal key series.
-  protected transient VectorKeySeriesSerialized serializedKeySeries;
-
   private transient long numRows = 0;
   private transient long cntr = 1;
   private transient long logEveryNRows = 0;
@@ -158,6 +151,8 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
       VectorizationContext vContext, OperatorDesc conf) throws HiveException {
     this(ctx);
 
+    LOG.info("VectorReduceSinkCommonOperator constructor");
+
     ReduceSinkDesc desc = (ReduceSinkDesc) conf;
     this.conf = desc;
     vectorDesc = (VectorReduceSinkDesc) desc.getVectorDesc();
@@ -247,6 +242,46 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
 
+    if (isLogDebugEnabled) {
+      LOG.debug("useUniformHash " + vectorReduceSinkInfo.getUseUniformHash());
+  
+      LOG.debug("reduceSinkKeyColumnMap " +
+          (vectorReduceSinkInfo.getReduceSinkKeyColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnMap())));
+      LOG.debug("reduceSinkKeyTypeInfos " +
+          (vectorReduceSinkInfo.getReduceSinkKeyTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyTypeInfos())));
+      LOG.debug("reduceSinkKeyColumnVectorTypes " +
+          (vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyColumnVectorTypes())));
+      LOG.debug("reduceSinkKeyExpressions " +
+          (vectorReduceSinkInfo.getReduceSinkKeyExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkKeyExpressions())));
+  
+      LOG.debug("reduceSinkValueColumnMap " +
+          (vectorReduceSinkInfo.getReduceSinkValueColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnMap())));
+      LOG.debug("reduceSinkValueTypeInfos " +
+          (vectorReduceSinkInfo.getReduceSinkValueTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueTypeInfos())));
+      LOG.debug("reduceSinkValueColumnVectorTypes " +
+          (vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueColumnVectorTypes())));
+      LOG.debug("reduceSinkValueExpressions " +
+          (vectorReduceSinkInfo.getReduceSinkValueExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkValueExpressions())));
+  
+      LOG.debug("reduceSinkBucketColumnMap " +
+          (vectorReduceSinkInfo.getReduceSinkBucketColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnMap())));
+      LOG.debug("reduceSinkBucketTypeInfos " +
+          (vectorReduceSinkInfo.getReduceSinkBucketTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketTypeInfos())));
+      LOG.debug("reduceSinkBucketColumnVectorTypes " +
+          (vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketColumnVectorTypes())));
+      LOG.debug("reduceSinkBucketExpressions " +
+          (vectorReduceSinkInfo.getReduceSinkBucketExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkBucketExpressions())));
+  
+      LOG.debug("reduceSinkPartitionColumnMap " +
+          (vectorReduceSinkInfo.getReduceSinkPartitionColumnMap() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnMap())));
+      LOG.debug("reduceSinkPartitionTypeInfos " +
+          (vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos())));
+      LOG.debug("reduceSinkPartitionColumnVectorTypes " +
+          (vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionColumnVectorTypes())));
+      LOG.debug("reduceSinkPartitionExpressions " +
+          (vectorReduceSinkInfo.getReduceSinkPartitionExpressions() == null ? "NULL" : Arrays.toString(vectorReduceSinkInfo.getReduceSinkPartitionExpressions())));
+    }
+
     if (LOG.isDebugEnabled()) {
       // Determine the name of our map or reduce task for debug tracing.
       BaseWork work = Utilities.getMapWork(hconf);
@@ -280,21 +315,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
     keyBinarySortableSerializeWrite = new BinarySortableSerializeWrite(columnSortOrder,
             columnNullMarker, columnNotNullMarker);
 
-    // Create all nulls key.
-    try {
-      Output nullKeyOutput = new Output();
-      keyBinarySortableSerializeWrite.set(nullKeyOutput);
-      for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
-        keyBinarySortableSerializeWrite.writeNull();
-      }
-      int nullBytesLength = nullKeyOutput.getLength();
-      nullBytes = new byte[nullBytesLength];
-      System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
-      nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
-    } catch (Exception e) {
-      throw new HiveException(e);
-    }
-
     valueLazyBinarySerializeWrite = new LazyBinarySerializeWrite(reduceSinkValueColumnMap.length);
 
     valueVectorSerializeRow =
@@ -312,101 +332,6 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re
     batchCounter = 0;
   }
 
-  @Override
-  public void process(Object row, int tag) throws HiveException {
-
-    try {
-      VectorizedRowBatch batch = (VectorizedRowBatch) row;
-
-      batchCounter++;
-
-      if (batch.size == 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
-        }
-        return;
-      }
-
-      // Perform any key expressions.  Results will go into scratch columns.
-      if (reduceSinkKeyExpressions != null) {
-        for (VectorExpression ve : reduceSinkKeyExpressions) {
-          ve.evaluate(batch);
-        }
-      }
-
-      // Perform any value expressions.  Results will go into scratch columns.
-      if (reduceSinkValueExpressions != null) {
-        for (VectorExpression ve : reduceSinkValueExpressions) {
-          ve.evaluate(batch);
-        }
-      }
-
-      serializedKeySeries.processBatch(batch);
-
-      boolean selectedInUse = batch.selectedInUse;
-      int[] selected = batch.selected;
-
-      int keyLength;
-      int logical;
-      int end;
-      int batchIndex;
-      do {
-        if (serializedKeySeries.getCurrentIsAllNull()) {
-
-          // Use the same logic as ReduceSinkOperator.toHiveKey.
-          //
-          if (tag == -1 || reduceSkipTag) {
-            keyWritable.set(nullBytes, 0, nullBytes.length);
-          } else {
-            keyWritable.setSize(nullBytes.length + 1);
-            System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
-            keyWritable.get()[nullBytes.length] = reduceTagByte;
-          }
-          keyWritable.setDistKeyLength(nullBytes.length);
-          keyWritable.setHashCode(nullKeyHashCode);
-
-        } else {
-
-          // One serialized key for 1 or more rows for the duplicate keys.
-          // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
-          // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
-          keyLength = serializedKeySeries.getSerializedLength();
-          if (tag == -1 || reduceSkipTag) {
-            keyWritable.set(serializedKeySeries.getSerializedBytes(),
-                serializedKeySeries.getSerializedStart(), keyLength);
-          } else {
-            keyWritable.setSize(keyLength + 1);
-            System.arraycopy(serializedKeySeries.getSerializedBytes(),
-                serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = reduceTagByte;
-          }
-          keyWritable.setDistKeyLength(keyLength);
-          keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
-        }
-
-        logical = serializedKeySeries.getCurrentLogical();
-        end = logical + serializedKeySeries.getCurrentDuplicateCount();
-        do {
-          batchIndex = (selectedInUse ? selected[logical] : logical);
-
-          valueLazyBinarySerializeWrite.reset();
-          valueVectorSerializeRow.serializeWrite(batch, batchIndex);
-
-          valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
-
-          collect(keyWritable, valueBytesWritable);
-        } while (++logical < end);
-  
-        if (!serializedKeySeries.next()) {
-          break;
-        }
-      } while (true);
-
-    } catch (Exception e) {
-      throw new HiveException(e);
-    }
-  }
-
   protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
index 325f773..0bc1cd1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 /*
  * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
  */
-public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkLongOperator extends VectorReduceSinkUniformHashOperator {
 
   private static final long serialVersionUID = 1L;
   private static final String CLASS_NAME = VectorReduceSinkLongOperator.class.getName();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
index 2027187..1cca94d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerialize
  * Specialized class for native vectorized reduce sink that is reducing on multiple key columns
  * (or a single non-long / non-string column).
  */
-public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkUniformHashOperator {
 
   private static final long serialVersionUID = 1L;
   private static final String CLASS_NAME = VectorReduceSinkMultiKeyOperator.class.getName();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
new file mode 100644
index 0000000..6312c44
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkObjectHashOperator.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is uniform hash (common) operator class for native vectorized reduce sink.
+ */
+public class VectorReduceSinkObjectHashOperator extends VectorReduceSinkCommonOperator {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkObjectHashOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  protected int[] reduceSinkBucketColumnMap;
+  protected TypeInfo[] reduceSinkBucketTypeInfos;
+
+  protected VectorExpression[] reduceSinkBucketExpressions;
+
+  protected int[] reduceSinkPartitionColumnMap;
+  protected TypeInfo[] reduceSinkPartitionTypeInfos;
+
+  protected VectorExpression[] reduceSinkPartitionExpressions;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  protected transient Output keyOutput;
+  protected transient VectorSerializeRow<BinarySortableSerializeWrite> keyVectorSerializeRow;
+
+  private transient boolean hasBuckets;
+  private transient int numBuckets;
+  private transient ObjectInspector[] bucketObjectInspectors;
+  private transient VectorExtractRow bucketVectorExtractRow;
+  private transient Object[] bucketFieldValues;
+
+  private transient boolean isPartitioned;
+  private transient ObjectInspector[] partitionObjectInspectors;
+  private transient VectorExtractRow partitionVectorExtractRow;
+  private transient Object[] partitionFieldValues;
+  private transient Random nonPartitionRandom;
+
+  /** Kryo ctor. */
+  protected VectorReduceSinkObjectHashOperator() {
+    super();
+  }
+
+  public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  public VectorReduceSinkObjectHashOperator(CompilationOpContext ctx,
+      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+    super(ctx, vContext, conf);
+
+    LOG.info("VectorReduceSinkObjectHashOperator constructor vectorReduceSinkInfo " + vectorReduceSinkInfo);
+
+    // This the is Object Hash class variation.
+    Preconditions.checkState(!vectorReduceSinkInfo.getUseUniformHash());
+
+    reduceSinkBucketColumnMap = vectorReduceSinkInfo.getReduceSinkBucketColumnMap();
+    reduceSinkBucketTypeInfos = vectorReduceSinkInfo.getReduceSinkBucketTypeInfos();
+    reduceSinkBucketExpressions = vectorReduceSinkInfo.getReduceSinkBucketExpressions();
+
+    reduceSinkPartitionColumnMap = vectorReduceSinkInfo.getReduceSinkPartitionColumnMap();
+    reduceSinkPartitionTypeInfos = vectorReduceSinkInfo.getReduceSinkPartitionTypeInfos();
+    reduceSinkPartitionExpressions = vectorReduceSinkInfo.getReduceSinkPartitionExpressions();
+  }
+
+  private ObjectInspector[] getObjectInspectorArray(TypeInfo[] typeInfos) {
+    final int size = typeInfos.length;
+    ObjectInspector[] objectInspectors = new ObjectInspector[size];
+    for(int i = 0; i < size; i++) {
+      TypeInfo typeInfo = typeInfos[i];
+      ObjectInspector standardWritableObjectInspector =
+              TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo);
+      objectInspectors[i] = standardWritableObjectInspector;
+    }
+    return objectInspectors;
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    keyOutput = new Output();
+    keyBinarySortableSerializeWrite.set(keyOutput);
+    keyVectorSerializeRow =
+        new VectorSerializeRow<BinarySortableSerializeWrite>(
+            keyBinarySortableSerializeWrite);
+    keyVectorSerializeRow.init(reduceSinkKeyTypeInfos, reduceSinkKeyColumnMap);
+ 
+    hasBuckets = false;
+    isPartitioned = false;
+    numBuckets = 0;
+ 
+    // Object Hash.
+
+    numBuckets = conf.getNumBuckets();
+    hasBuckets = (numBuckets > 0);
+
+    if (hasBuckets) {
+      bucketObjectInspectors = getObjectInspectorArray(reduceSinkBucketTypeInfos);
+      bucketVectorExtractRow = new VectorExtractRow();
+      bucketVectorExtractRow.init(reduceSinkBucketTypeInfos, reduceSinkBucketColumnMap);
+      bucketFieldValues = new Object[reduceSinkBucketTypeInfos.length];
+    }
+  
+    isPartitioned = (conf.getPartitionCols() != null);
+    if (!isPartitioned) {
+      nonPartitionRandom = new Random(12345);
+    } else {
+      partitionObjectInspectors = getObjectInspectorArray(reduceSinkPartitionTypeInfos);
+      LOG.debug("*NEW* partitionObjectInspectors " + Arrays.toString(partitionObjectInspectors));
+      partitionVectorExtractRow = new VectorExtractRow();
+      partitionVectorExtractRow.init(reduceSinkPartitionTypeInfos, reduceSinkPartitionColumnMap);
+      partitionFieldValues = new Object[reduceSinkPartitionTypeInfos.length];
+    }
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+
+    try {
+
+      VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+      batchCounter++;
+
+      if (batch.size == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+        }
+        return;
+      }
+
+      // Perform any key expressions.  Results will go into scratch columns.
+      if (reduceSinkKeyExpressions != null) {
+        for (VectorExpression ve : reduceSinkKeyExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+  
+      // Perform any value expressions.  Results will go into scratch columns.
+      if (reduceSinkValueExpressions != null) {
+        for (VectorExpression ve : reduceSinkValueExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+  
+      // Perform any bucket expressions.  Results will go into scratch columns.
+      if (reduceSinkBucketExpressions != null) {
+        for (VectorExpression ve : reduceSinkBucketExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+  
+      // Perform any partition expressions.  Results will go into scratch columns.
+      if (reduceSinkPartitionExpressions != null) {
+        for (VectorExpression ve : reduceSinkPartitionExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+  
+      final boolean selectedInUse = batch.selectedInUse;
+      int[] selected = batch.selected;
+
+      final int size = batch.size;
+      for (int logical = 0; logical < size; logical++) {
+        final int batchIndex = (selectedInUse ? selected[logical] : logical);
+  
+        final int hashCode;
+        if (!hasBuckets) {
+          if (!isPartitioned) {
+            hashCode = nonPartitionRandom.nextInt();
+          } else {
+            partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
+            hashCode = 
+                ObjectInspectorUtils.getBucketHashCode(
+                    partitionFieldValues, partitionObjectInspectors);
+          }
+        } else {
+          bucketVectorExtractRow.extractRow(batch, batchIndex, bucketFieldValues);
+          final int bucketNum =
+              ObjectInspectorUtils.getBucketNumber(
+                  bucketFieldValues, bucketObjectInspectors, numBuckets);
+          if (!isPartitioned) {
+            hashCode = nonPartitionRandom.nextInt() * 31 + bucketNum;
+          } else {
+            partitionVectorExtractRow.extractRow(batch, batchIndex, partitionFieldValues);
+            hashCode = 
+                ObjectInspectorUtils.getBucketHashCode(
+                    partitionFieldValues, partitionObjectInspectors) * 31 + bucketNum;
+          }
+        }
+  
+        keyBinarySortableSerializeWrite.reset();
+        keyVectorSerializeRow.serializeWrite(batch, batchIndex);
+  
+        // One serialized key for 1 or more rows for the duplicate keys.
+        final int keyLength = keyOutput.getLength();
+        if (tag == -1 || reduceSkipTag) {
+          keyWritable.set(keyOutput.getData(), 0, keyLength);
+        } else {
+          keyWritable.setSize(keyLength + 1);
+          System.arraycopy(keyOutput.getData(), 0, keyWritable.get(), 0, keyLength);
+          keyWritable.get()[keyLength] = reduceTagByte;
+        }
+        keyWritable.setDistKeyLength(keyLength);
+        keyWritable.setHashCode(hashCode);
+  
+        valueLazyBinarySerializeWrite.reset();
+        valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+  
+        valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+  
+        collect(keyWritable, valueBytesWritable);
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
index b655e6e..a838f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 /*
  * Specialized class for native vectorized reduce sink that is reducing on a single long key column.
  */
-public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperator {
+public class VectorReduceSinkStringOperator extends VectorReduceSinkUniformHashOperator {
 
   private static final long serialVersionUID = 1L;
   private static final String CLASS_NAME = VectorReduceSinkStringOperator.class.getName();

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
new file mode 100644
index 0000000..2dfa721
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkUniformHashOperator.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.reducesink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.Counter;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesSerialized;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
+import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
+import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hive.common.util.HashCodeUtil;
+
+/**
+ * This class is uniform hash (common) operator class for native vectorized reduce sink.
+ */
+public abstract class VectorReduceSinkUniformHashOperator extends VectorReduceSinkCommonOperator {
+
+  private static final long serialVersionUID = 1L;
+  private static final String CLASS_NAME = VectorReduceSinkUniformHashOperator.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
+  // The serialized all null key and its hash code.
+  private transient byte[] nullBytes;
+  private transient int nullKeyHashCode;
+
+  // The object that determines equal key series.
+  protected transient VectorKeySeriesSerialized serializedKeySeries;
+
+
+  /** Kryo ctor. */
+  protected VectorReduceSinkUniformHashOperator() {
+    super();
+  }
+
+  public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  public VectorReduceSinkUniformHashOperator(CompilationOpContext ctx,
+      VectorizationContext vContext, OperatorDesc conf) throws HiveException {
+    super(ctx, vContext, conf);
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+
+    // Create all nulls key.
+    try {
+      Output nullKeyOutput = new Output();
+      keyBinarySortableSerializeWrite.set(nullKeyOutput);
+      for (int i = 0; i < reduceSinkKeyColumnMap.length; i++) {
+        keyBinarySortableSerializeWrite.writeNull();
+      }
+      int nullBytesLength = nullKeyOutput.getLength();
+      nullBytes = new byte[nullBytesLength];
+      System.arraycopy(nullKeyOutput.getData(), 0, nullBytes, 0, nullBytesLength);
+      nullKeyHashCode = HashCodeUtil.calculateBytesHashCode(nullBytes, 0, nullBytesLength);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void process(Object row, int tag) throws HiveException {
+
+    try {
+      VectorizedRowBatch batch = (VectorizedRowBatch) row;
+
+      batchCounter++;
+
+      if (batch.size == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty");
+        }
+        return;
+      }
+
+      // Perform any key expressions.  Results will go into scratch columns.
+      if (reduceSinkKeyExpressions != null) {
+        for (VectorExpression ve : reduceSinkKeyExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+
+      // Perform any value expressions.  Results will go into scratch columns.
+      if (reduceSinkValueExpressions != null) {
+        for (VectorExpression ve : reduceSinkValueExpressions) {
+          ve.evaluate(batch);
+        }
+      }
+
+      serializedKeySeries.processBatch(batch);
+
+      boolean selectedInUse = batch.selectedInUse;
+      int[] selected = batch.selected;
+
+      int keyLength;
+      int logical;
+      int end;
+      int batchIndex;
+      do {
+        if (serializedKeySeries.getCurrentIsAllNull()) {
+
+          // Use the same logic as ReduceSinkOperator.toHiveKey.
+          //
+          if (tag == -1 || reduceSkipTag) {
+            keyWritable.set(nullBytes, 0, nullBytes.length);
+          } else {
+            keyWritable.setSize(nullBytes.length + 1);
+            System.arraycopy(nullBytes, 0, keyWritable.get(), 0, nullBytes.length);
+            keyWritable.get()[nullBytes.length] = reduceTagByte;
+          }
+          keyWritable.setDistKeyLength(nullBytes.length);
+          keyWritable.setHashCode(nullKeyHashCode);
+
+        } else {
+
+          // One serialized key for 1 or more rows for the duplicate keys.
+          // LOG.info("reduceSkipTag " + reduceSkipTag + " tag " + tag + " reduceTagByte " + (int) reduceTagByte + " keyLength " + serializedKeySeries.getSerializedLength());
+          // LOG.info("process offset " + serializedKeySeries.getSerializedStart() + " length " + serializedKeySeries.getSerializedLength());
+          keyLength = serializedKeySeries.getSerializedLength();
+          if (tag == -1 || reduceSkipTag) {
+            keyWritable.set(serializedKeySeries.getSerializedBytes(),
+                serializedKeySeries.getSerializedStart(), keyLength);
+          } else {
+            keyWritable.setSize(keyLength + 1);
+            System.arraycopy(serializedKeySeries.getSerializedBytes(),
+                serializedKeySeries.getSerializedStart(), keyWritable.get(), 0, keyLength);
+            keyWritable.get()[keyLength] = reduceTagByte;
+          }
+          keyWritable.setDistKeyLength(keyLength);
+          keyWritable.setHashCode(serializedKeySeries.getCurrentHashCode());
+        }
+
+        logical = serializedKeySeries.getCurrentLogical();
+        end = logical + serializedKeySeries.getCurrentDuplicateCount();
+        do {
+          batchIndex = (selectedInUse ? selected[logical] : logical);
+
+          valueLazyBinarySerializeWrite.reset();
+          valueVectorSerializeRow.serializeWrite(batch, batchIndex);
+
+          valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength());
+
+          collect(keyWritable, valueBytesWritable);
+        } while (++logical < end);
+  
+        if (!serializedKeySeries.next()) {
+          break;
+        }
+      } while (true);
+
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
index 6582cdd..c23d202 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java
@@ -46,13 +46,12 @@ import org.apache.hadoop.mapred.Counters.Group;
  * Each session uses a new object, which creates a new file.
  */
 public class HiveHistoryImpl implements HiveHistory{
+  private static final Logger LOG = LoggerFactory.getLogger("hive.ql.exec.HiveHistoryImpl");
 
   PrintWriter histStream; // History File stream
 
   String histFileName; // History file name
 
-  private static final Logger LOG = LoggerFactory.getLogger("hive.ql.exec.HiveHistoryImpl");
-
   private static final Random randGen = new Random();
 
   private LogHelper console;
@@ -305,7 +304,7 @@ public class HiveHistoryImpl implements HiveHistory{
   /**
    * write out counters.
    */
-  static ThreadLocal<Map<String,String>> ctrMapFactory =
+  static final ThreadLocal<Map<String,String>> ctrMapFactory =
       new ThreadLocal<Map<String, String>>() {
     @Override
     protected Map<String,String> initialValue() {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
index 2f0bd88..4380fe3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,66 +18,26 @@
 
 package org.apache.hadoop.hive.ql.hooks;
 
-import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 
-public class HookUtils {
-  /**
-   * Returns the hooks specified in a configuration variable.  The hooks are returned
-   * in a list in the order they were specified in the configuration variable.
-   *
-   * @param conf        Configuration object
-   * @param hookConfVar The configuration variable specifying a comma separated list
-   *                    of the hook class names.
-   * @param clazz       The super type of the hooks.
-   * @return            A list of the hooks cast as the type specified in clazz,
-   *                    in the order they are listed in the value of hookConfVar
-   * @throws ClassNotFoundException
-   * @throws IllegalAccessException
-   * @throws InstantiationException
-   */
-  public static <T extends Hook> List<T> getHooks(HiveConf conf,
-      ConfVars hookConfVar, Class<T> clazz)
-      throws InstantiationException, IllegalAccessException, ClassNotFoundException  {
-    String csHooks = conf.getVar(hookConfVar);
-    List<T> hooks = new ArrayList<T>();
-    if (csHooks == null) {
-      return hooks;
-    }
 
-    csHooks = csHooks.trim();
-    if (csHooks.equals("")) {
-      return hooks;
-    }
-
-    String[] hookClasses = csHooks.split(",");
-    for (String hookClass : hookClasses) {
-        T hook = (T) Class.forName(hookClass.trim(), true,
-                Utilities.getSessionSpecifiedClassLoader()).newInstance();
-        hooks.add(hook);
-    }
-
-    return hooks;
-  }
+public class HookUtils {
 
   public static String redactLogString(HiveConf conf, String logString)
-      throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+          throws InstantiationException, IllegalAccessException, ClassNotFoundException {
 
     String redactedString = logString;
 
     if (conf != null && logString != null) {
-      List<Redactor> queryRedactors = getHooks(conf, ConfVars.QUERYREDACTORHOOKS, Redactor.class);
+      List<Redactor> queryRedactors = new HooksLoader(conf).getHooks(ConfVars.QUERYREDACTORHOOKS);
       for (Redactor redactor : queryRedactors) {
         redactor.setConf(conf);
         redactedString = redactor.redactQuery(redactedString);
       }
     }
-
     return redactedString;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
new file mode 100644
index 0000000..0008726
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HooksLoader.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.hadoop.hive.ql.hooks;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+
+/**
+ * A loader class for {@link Hook}s. The class provides a way to create and instantiate {@link Hook} objects. The
+ * methodology for how hooks are loaded is left up to the individual methods.
+ */
+public class HooksLoader {
+
+  private final HiveConf conf;
+
+  /**
+   * Creates a new {@link HooksLoader} that uses the specified {@link HiveConf} to load the {@link Hook}s.
+   *
+   * @param conf the {@link HiveConf} to use when loading the {@link Hook}s
+   */
+  public HooksLoader(HiveConf conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Delegates to {@link #getHooks(HiveConf.ConfVars)} and prints the to the specified {@link SessionState.LogHelper} if
+   * a {@link ClassNotFoundException} is thrown.
+   *
+   * @param hookConfVar the configuration variable specifying a comma separated list of the hook class names
+   * @param console the {@link SessionState.LogHelper} to print to if a {@link ClassNotFoundException} is thrown by the
+   *                {@link #getHooks(HiveConf.ConfVars)} method
+   *
+   * @return a list of the hooks objects, in the order they are listed in the value of hookConfVar
+   *
+   * @throws ClassNotFoundException if the specified class names could not be found
+   * @throws IllegalAccessException if the specified class names could not be accessed
+   * @throws InstantiationException if the specified class names could not be instantiated
+   */
+  public <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar, SessionState.LogHelper console)
+          throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+    try {
+      return getHooks(hookConfVar);
+    } catch (ClassNotFoundException e) {
+      console.printError(hookConfVar.varname + " Class not found: " + e.getMessage());
+      throw e;
+    }
+  }
+
+  /**
+   * Returns the hooks specified in a configuration variable. The hooks are returned in a list in the order they were
+   * specified in the configuration variable. The value of the specified conf variable should be a comma separated list
+   * of class names where each class implements the {@link Hook} interface. The method uses reflection to an instance
+   * of each class and then returns them in a {@link List}.
+   *
+   * @param hookConfVar The configuration variable specifying a comma separated list of the hook class names
+   *
+   * @return a list of the hooks objects, in the order they are listed in the value of hookConfVar
+   *
+   * @throws ClassNotFoundException if the specified class names could not be found
+   * @throws IllegalAccessException if the specified class names could not be accessed
+   * @throws InstantiationException if the specified class names could not be instantiated
+   */
+  public <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar)
+          throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+    String csHooks = conf.getVar(hookConfVar);
+    ImmutableList.Builder<T> hooks = ImmutableList.builder();
+    if (csHooks == null) {
+      return ImmutableList.of();
+    }
+
+    csHooks = csHooks.trim();
+    if (csHooks.isEmpty()) {
+      return ImmutableList.of();
+    }
+
+    String[] hookClasses = csHooks.split(",");
+    for (String hookClass : hookClasses) {
+      T hook = (T) Class.forName(hookClass.trim(), true,
+              Utilities.getSessionSpecifiedClassLoader()).newInstance();
+      hooks.add(hook);
+    }
+
+    return hooks.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index 2806c54..7305436 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.commons.collections.SetUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -403,7 +404,7 @@ public class LineageInfo implements Serializable {
      * @param expr the expr to set
      */
     public void setExpr(String expr) {
-      this.expr = expr;
+      this.expr = StringInternUtils.internIfNotNull(expr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
index 18ef325..ac79ceb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecOrcRowGroupCountPrinter.java
@@ -60,10 +60,10 @@ public class PostExecOrcRowGroupCountPrinter implements ExecuteWithHookContext {
       if (counters != null) {
         for (CounterGroup group : counters) {
           if (group.getName().equals(LlapIOCounters.class.getName())) {
-            console.printError(tezTask.getId() + " LLAP IO COUNTERS:");
+            console.printInfo(tezTask.getId() + " LLAP IO COUNTERS:", false);
             for (TezCounter counter : group) {
               if (counter.getDisplayName().equals(LlapIOCounters.SELECTED_ROWGROUPS.name())) {
-                console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+                console.printInfo("   " + counter.getDisplayName() + ": " + counter.getValue(), false);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
index 412f45c..45bd6e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecTezSummaryPrinter.java
@@ -62,25 +62,25 @@ public class PostExecTezSummaryPrinter implements ExecuteWithHookContext {
         String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
         for (CounterGroup group : counters) {
           if (hiveCountersGroup.equals(group.getDisplayName())) {
-            console.printError(tezTask.getId() + " HIVE COUNTERS:");
+            console.printInfo(tezTask.getId() + " HIVE COUNTERS:", false);
             for (TezCounter counter : group) {
-              console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+              console.printInfo("   " + counter.getDisplayName() + ": " + counter.getValue(), false);
             }
           } else if (group.getName().equals(FileSystemCounter.class.getName())) {
-            console.printError(tezTask.getId() + " FILE SYSTEM COUNTERS:");
+            console.printInfo(tezTask.getId() + " FILE SYSTEM COUNTERS:", false);
             for (TezCounter counter : group) {
               // HDFS counters should be relatively consistent across test runs when compared to
               // local file system counters
               if (counter.getName().contains("HDFS")) {
-                console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+                console.printInfo("   " + counter.getDisplayName() + ": " + counter.getValue(), false);
               }
             }
           } else if (group.getName().equals(LlapIOCounters.class.getName())) {
-            console.printError(tezTask.getId() + " LLAP IO COUNTERS:");
+            console.printInfo(tezTask.getId() + " LLAP IO COUNTERS:", false);
             List<String> testSafeCounters = LlapIOCounters.testSafeCounterNames();
             for (TezCounter counter : group) {
               if (testSafeCounters.contains(counter.getDisplayName())) {
-                console.printError("   " + counter.getDisplayName() + ": " + counter.getValue());
+                console.printInfo("   " + counter.getDisplayName() + ": " + counter.getValue(), false);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
index b4fc125..3e74396 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PostExecutePrinter.java
@@ -116,8 +116,8 @@ public class PostExecutePrinter implements ExecuteWithHookContext {
     }
 
     if (queryState != null) {
-      console.printError("POSTHOOK: query: " + queryState.getQueryString().trim());
-      console.printError("POSTHOOK: type: " + queryState.getCommandType());
+      console.printInfo("POSTHOOK: query: " + queryState.getQueryString().trim(), false);
+      console.printInfo("POSTHOOK: type: " + queryState.getCommandType(), false);
     }
 
     PreExecutePrinter.printEntities(console, inputs, "POSTHOOK: Input: ");
@@ -167,7 +167,7 @@ public class PostExecutePrinter implements ExecuteWithHookContext {
         }
         sb.append("]");
 
-        console.printError(sb.toString());
+        console.printInfo(sb.toString(), false);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
index 232c62d..20acfb1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/PreExecutePrinter.java
@@ -65,8 +65,8 @@ public class PreExecutePrinter implements ExecuteWithHookContext {
     }
 
     if (queryState != null) {
-      console.printError("PREHOOK: query: " + queryState.getQueryString().trim());
-      console.printError("PREHOOK: type: " + queryState.getCommandType());
+      console.printInfo("PREHOOK: query: " + queryState.getQueryString().trim(), false);
+      console.printInfo("PREHOOK: type: " + queryState.getCommandType(), false);
     }
 
     printEntities(console, inputs, "PREHOOK: Input: ");
@@ -80,7 +80,7 @@ public class PreExecutePrinter implements ExecuteWithHookContext {
     }
     Collections.sort(strings);
     for (String s : strings) {
-      console.printError(prefix + s);
+      console.printInfo(prefix + s, false);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
index 5340848..1845121 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookContextImpl.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.hive.ql.hooks;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 
+
 public class QueryLifeTimeHookContextImpl implements QueryLifeTimeHookContext {
+
   private HiveConf conf;
   private String command;
-  private HookContext hc = null;
+  private HookContext hc;
 
   @Override
   public HiveConf getHiveConf() {
@@ -54,4 +56,34 @@ public class QueryLifeTimeHookContextImpl implements QueryLifeTimeHookContext {
   public void setHookContext(HookContext hc) {
     this.hc = hc;
   }
+
+  public static class Builder {
+
+    private HiveConf conf;
+    private String command;
+    private HookContext hc;
+
+    public Builder withHiveConf(HiveConf conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder withCommand(String command) {
+      this.command = command;
+      return this;
+    }
+
+    public Builder withHookContext(HookContext hc) {
+      this.hc = hc;
+      return this;
+    }
+
+    public QueryLifeTimeHookContextImpl build() {
+      QueryLifeTimeHookContextImpl queryLifeTimeHookContext = new QueryLifeTimeHookContextImpl();
+      queryLifeTimeHookContext.setHiveConf(this.conf);
+      queryLifeTimeHookContext.setCommand(this.command);
+      queryLifeTimeHookContext.setHookContext(this.hc);
+      return queryLifeTimeHookContext;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
new file mode 100644
index 0000000..787590d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/QueryLifeTimeHookWithParseHooks.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.hooks;
+
+
+/**
+ * Extension of {@link QueryLifeTimeHook} that has hooks for pre and post parsing of a query.
+ */
+public interface QueryLifeTimeHookWithParseHooks extends QueryLifeTimeHook {
+
+  /**
+   * Invoked before a query enters the parse phase.
+   *
+   * @param ctx the context for the hook
+   */
+  void beforeParse(QueryLifeTimeHookContext ctx);
+
+  /**
+   * Invoked after a query parsing. Note: if 'hasError' is true,
+   * the query won't enter the following compilation phase.
+   *
+   * @param ctx the context for the hook
+   * @param hasError whether any error occurred during compilation.
+   */
+  void afterParse(QueryLifeTimeHookContext ctx, boolean hasError);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
index a1408e9..2c3ba7f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndex.java
@@ -26,10 +26,8 @@ import org.slf4j.LoggerFactory;
  * Holds index related constants
  */
 public class HiveIndex {
-
   public static final Logger l4j = LoggerFactory.getLogger("HiveIndex");
-
-  public static String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
+  public static final String INDEX_TABLE_CREATETIME = "hive.index.basetbl.dfs.lastModifiedTime";
 
   public static enum IndexType {
     AGGREGATE_TABLE("aggregate",  AggregateIndexHandler.class.getName()),