You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/05/02 17:58:01 UTC

[2/2] hive git commit: HIVE-16546: LLAP: Fail map join tasks if hash table memory exceeds threshold (Prasanth Jayachandran reviewed by Sergey Shelukhin)

HIVE-16546: LLAP: Fail map join tasks if hash table memory exceeds threshold (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ffff404
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ffff404
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ffff404

Branch: refs/heads/master
Commit: 0ffff404088a428da752a60a0847f51845e618ff
Parents: c2637e6
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 2 10:53:19 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 2 10:53:19 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/MemoryEstimate.java      | 29 ++++++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 ++
 .../llap/IncrementalObjectSizeEstimator.java    |  4 +-
 .../UDAFTemplates/VectorUDAFAvg.txt             |  2 +-
 .../UDAFTemplates/VectorUDAFMinMax.txt          |  2 +-
 .../UDAFTemplates/VectorUDAFMinMaxDecimal.txt   |  2 +-
 .../VectorUDAFMinMaxIntervalDayTime.txt         |  2 +-
 .../UDAFTemplates/VectorUDAFMinMaxString.txt    |  4 +-
 .../UDAFTemplates/VectorUDAFMinMaxTimestamp.txt |  2 +-
 .../UDAFTemplates/VectorUDAFSum.txt             |  2 +-
 .../UDAFTemplates/VectorUDAFVar.txt             |  2 +-
 .../UDAFTemplates/VectorUDAFVarDecimal.txt      |  4 +-
 .../mapjoin/MapJoinMemoryExhaustionError.java   | 28 ++++++++++
 .../MapJoinMemoryExhaustionException.java       | 29 ----------
 .../mapjoin/MapJoinMemoryExhaustionHandler.java |  6 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |  5 +-
 .../persistence/BytesBytesMultiHashMap.java     | 17 +++++-
 .../ql/exec/persistence/HashMapWrapper.java     | 10 +++-
 .../persistence/HybridHashTableContainer.java   |  5 ++
 .../persistence/MapJoinBytesTableContainer.java | 58 +++++++++++++++++++-
 .../exec/persistence/MapJoinTableContainer.java |  3 +-
 .../hive/ql/exec/tez/HashTableLoader.java       | 42 ++++++++++++--
 .../hadoop/hive/ql/exec/tez/TezProcessor.java   | 11 +++-
 .../vector/VectorAggregationBufferBatch.java    |  4 +-
 .../ql/exec/vector/VectorGroupByOperator.java   |  2 +-
 .../aggregates/VectorAggregateExpression.java   |  2 +-
 .../aggregates/VectorUDAFAvgDecimal.java        |  2 +-
 .../aggregates/VectorUDAFAvgTimestamp.java      |  2 +-
 .../aggregates/VectorUDAFBloomFilter.java       |  4 +-
 .../aggregates/VectorUDAFBloomFilterMerge.java  |  2 +-
 .../expressions/aggregates/VectorUDAFCount.java |  2 +-
 .../aggregates/VectorUDAFCountMerge.java        |  2 +-
 .../aggregates/VectorUDAFCountStar.java         |  2 +-
 .../aggregates/VectorUDAFStdPopTimestamp.java   |  2 +-
 .../aggregates/VectorUDAFStdSampTimestamp.java  |  2 +-
 .../aggregates/VectorUDAFSumDecimal.java        |  2 +-
 .../aggregates/VectorUDAFVarPopTimestamp.java   |  2 +-
 .../aggregates/VectorUDAFVarSampTimestamp.java  |  2 +-
 .../fast/VectorMapJoinFastBytesHashMap.java     |  5 ++
 .../VectorMapJoinFastBytesHashMultiSet.java     |  5 ++
 .../fast/VectorMapJoinFastBytesHashSet.java     |  5 ++
 .../fast/VectorMapJoinFastBytesHashTable.java   |  6 ++
 .../fast/VectorMapJoinFastHashTable.java        |  7 +++
 .../fast/VectorMapJoinFastHashTableLoader.java  | 47 +++++++++++++++-
 .../mapjoin/fast/VectorMapJoinFastKeyStore.java | 11 +++-
 .../fast/VectorMapJoinFastLongHashMap.java      |  9 ++-
 .../fast/VectorMapJoinFastLongHashMultiSet.java |  5 ++
 .../fast/VectorMapJoinFastLongHashSet.java      |  5 ++
 .../fast/VectorMapJoinFastLongHashTable.java    | 15 +++++
 .../fast/VectorMapJoinFastMultiKeyHashMap.java  |  5 ++
 .../VectorMapJoinFastMultiKeyHashMultiSet.java  |  4 ++
 .../fast/VectorMapJoinFastMultiKeyHashSet.java  |  5 +-
 .../fast/VectorMapJoinFastStringHashMap.java    |  9 +++
 .../VectorMapJoinFastStringHashMultiSet.java    |  8 +++
 .../fast/VectorMapJoinFastStringHashSet.java    |  8 +++
 .../fast/VectorMapJoinFastTableContainer.java   | 16 +++++-
 .../fast/VectorMapJoinFastValueStore.java       |  8 ++-
 .../hashtable/VectorMapJoinHashTable.java       |  3 +-
 .../VectorMapJoinOptimizedHashSet.java          |  5 ++
 .../VectorMapJoinOptimizedHashTable.java        |  9 +++
 .../VectorMapJoinOptimizedStringHashSet.java    |  8 +++
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  3 +-
 .../hive/ql/optimizer/MapJoinProcessor.java     |  8 +--
 .../calcite/translator/HiveOpConverter.java     |  2 +-
 .../physical/GenMRSkewJoinProcessor.java        |  3 +-
 .../physical/GenSparkSkewJoinProcessor.java     |  3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  2 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    | 17 +++++-
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 12 ++--
 .../ql/udf/generic/GenericUDAFComputeStats.java | 22 ++++----
 .../TestMapJoinMemoryExhaustionHandler.java     |  4 +-
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 25 ++++++++-
 .../hadoop/hive/ql/util/JavaDataModel.java      | 26 ++++-----
 73 files changed, 509 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
new file mode 100644
index 0000000..36ae56f
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/MemoryEstimate.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+/**
+ * Interface that can be used to provide size estimates based on data structures held in memory for an object instance.
+ */
+public interface MemoryEstimate {
+  /**
+   * Returns estimated memory size based {@link org.apache.hadoop.hive.ql.util.JavaDataModel}
+   *
+   * @return estimated memory size in bytes
+   */
+  long getEstimatedMemorySize();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index ea8485d..84398c6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3161,6 +3161,10 @@ public class HiveConf extends Configuration {
         "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" +
         "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" +
         "executors and configured max concurrency."),
+    LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL("hive.llap.mapjoin.memory.monitor.check.interval", 100000L,
+      "Check memory usage of mapjoin hash tables after every interval of this many rows. If map join hash table\n" +
+        "memory usage exceeds (hive.auto.convert.join.noconditionaltask.size * hive.hash.table.inflation.factor)\n" +
+        "when running in LLAP, tasks will get killed and not retried. Set the value to 0 to disable this feature."),
     LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4,
         "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" +
         "executors in llap daemon, it would be set to number of executors at runtime.",

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index ff6e7ce..6cf8dbb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -116,7 +116,7 @@ public class IncrementalObjectSizeEstimator {
           addToProcessing(byType, stack, fieldObj, fieldClass);
         }
       }
-      estimator.directSize = JavaDataModel.alignUp(
+      estimator.directSize = (int) JavaDataModel.alignUp(
           estimator.directSize, memoryModel.memoryAlign());
     }
   }
@@ -454,7 +454,7 @@ public class IncrementalObjectSizeEstimator {
           if (len != 0) {
             int elementSize = getPrimitiveSize(e.field.getType().getComponentType());
             arraySize += elementSize * len;
-            arraySize = JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign());
+            arraySize = (int) JavaDataModel.alignUp(arraySize, memoryModel.memoryAlign());
           }
           referencedSize += arraySize;
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
index 4393c3b..46cbb5b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFAvg.txt
@@ -471,7 +471,7 @@ public class <ClassName> extends VectorAggregateExpression {
   }     
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
index 7468c2f..2261e1b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMax.txt
@@ -442,7 +442,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
index 57b7ea5..58d2d22 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxDecimal.txt
@@ -458,7 +458,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
index 749e97e..515692e 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxIntervalDayTime.txt
@@ -441,7 +441,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
index 9dfc147..c210e4c 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxString.txt
@@ -81,7 +81,7 @@ public class <ClassName> extends VectorAggregateExpression {
       @Override
       public int getVariableSize() {
         JavaDataModel model = JavaDataModel.get();
-        return model.lengthForByteArrayOfSize(bytes.length);
+        return (int) model.lengthForByteArrayOfSize(bytes.length);
       }
 
       @Override
@@ -388,7 +388,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
index 32ecb34..074aefd 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
@@ -443,7 +443,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
index bd0f14d..a89ae0a 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFSum.txt
@@ -433,7 +433,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
index dc9d4b1..1e3516b 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVar.txt
@@ -513,7 +513,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
index 01062a9..b3ec7e9 100644
--- a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFVarDecimal.txt
@@ -467,7 +467,7 @@ public class <ClassName> extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +
@@ -488,4 +488,4 @@ public class <ClassName> extends VectorAggregateExpression {
   public void setInputExpression(VectorExpression inputExpression) {
     this.inputExpression = inputExpression;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
new file mode 100644
index 0000000..4ad4f98
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+/**
+ * When this Error is thrown, better not retry.
+ */
+public class MapJoinMemoryExhaustionError extends Error {
+  private static final long serialVersionUID = 3678353959830506881L;
+  public MapJoinMemoryExhaustionError(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
deleted file mode 100644
index dbe00b6..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.mapjoin;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-
-
-public class MapJoinMemoryExhaustionException extends HiveException {
-  private static final long serialVersionUID = 3678353959830506881L;
-  public MapJoinMemoryExhaustionException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
index 7fc3226..d5e81e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
@@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler {
    *
    * @param tableContainerSize currently table container size
    * @param numRows number of rows processed
-   * @throws MapJoinMemoryExhaustionException
+   * @throws MapJoinMemoryExhaustionError
    */
   public void checkMemoryStatus(long tableContainerSize, long numRows)
-  throws MapJoinMemoryExhaustionException {
+  throws MapJoinMemoryExhaustionError {
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     double percentage = (double) usedMemory / (double) maxHeapSize;
     String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
         + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage);
     console.printInfo(msg);
     if(percentage > maxMemoryUsage) {
-      throw new MapJoinMemoryExhaustionException(msg);
+      throw new MapJoinMemoryExhaustionError(msg);
     }
    }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 595d1bd..c5d4f9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -385,7 +384,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
           + Utilities.showTime(elapsed) + " sec.");
     } catch (Throwable throwable) {
       if (throwable instanceof OutOfMemoryError
-          || (throwable instanceof MapJoinMemoryExhaustionException)) {
+          || (throwable instanceof MapJoinMemoryExhaustionError)) {
         l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
         return 3;
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 04e24bd..360b639 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -46,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked
  * and there's very little in common left save for quadratic probing (and that with some changes).
  */
-public final class BytesBytesMultiHashMap {
+public final class BytesBytesMultiHashMap implements MemoryEstimate {
   public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class);
 
   /*
@@ -521,7 +523,18 @@ public final class BytesBytesMultiHashMap {
    * @return number of bytes
    */
   public long memorySize() {
-    return writeBuffers.size() + refs.length * 8 + 100;
+    return getEstimatedMemorySize();
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += writeBuffers.getEstimatedMemorySize();
+    size += jdm.lengthForLongArrayOfSize(refs.length);
+    // 11 primitive1 fields, 2 refs above with alignment
+    size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign());
+    return size;
   }
 
   public void seal() {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
index a3bccc6..adf1a90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable;
 public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
   private static final long serialVersionUID = 1L;
   protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class);
-
+  private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L;
   // default threshold for using main memory based HashMap
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
@@ -140,6 +140,14 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Ser
     return new GetAdaptor(keyTypeFromLoader);
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately
+    // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which
+    // is very intrusive. So assuming default entry size here.
+    return size() * DEFAULT_HASHMAP_ENTRY_SIZE;
+  }
+
   private class GetAdaptor implements ReusableGetAdaptor {
 
     private Object[] currentKey;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 04e89e8..6523f00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -118,6 +118,11 @@ public class HybridHashTableContainer
 
   private final String spillLocalDirs;
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return memoryUsed;
+  }
+
   /**
    * This class encapsulates the triplet together since they are closely related to each other
    * The triplet: hashmap (either in memory or on disk), small table container, big table container

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index c86e5f5..014d17a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -72,6 +74,11 @@ public class MapJoinBytesTableContainer
          implements MapJoinTableContainer, MapJoinTableContainerDirectAccess {
   private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class);
 
+  // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every
+  // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared
+  // to writing an instrumentation agent for object size estimation
+  public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L;
+
   private final BytesBytesMultiHashMap hashMap;
   /** The OI used to deserialize values. We never deserialize keys. */
   private LazyBinaryStructObjectInspector internalValueOi;
@@ -147,7 +154,7 @@ public class MapJoinBytesTableContainer
     this.notNullMarkers = notNullMarkers;
   }
 
-  public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource {
+  public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate {
     void setKeyValue(Writable key, Writable val) throws SerDeException;
     /** Get hash value from the key. */
     int getHashFromKey() throws SerDeException;
@@ -216,6 +223,22 @@ public class MapJoinBytesTableContainer
     public int getHashFromKey() throws SerDeException {
       throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer");
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += keySerDe == null ? 0 : jdm.object();
+      size += valSerDe == null ? 0 : jdm.object();
+      size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object();
+      size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object();
+      size += jdm.primitive1();
+      return size;
+    }
   }
 
   static class LazyBinaryKvWriter implements KeyValueHelper {
@@ -319,6 +342,15 @@ public class MapJoinBytesTableContainer
       aliasFilter &= filterGetter.getShort();
       return aliasFilter;
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += (4 * jdm.object());
+      size += jdm.primitive1();
+      return size;
+    }
   }
 
   /*
@@ -361,6 +393,15 @@ public class MapJoinBytesTableContainer
       int keyLength = key.getLength();
       return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += jdm.object() + (key == null ? 0 : key.getCapacity());
+      size += jdm.object() + (val == null ? 0 : val.getCapacity());
+      return size;
+    }
   }
 
   @Override
@@ -768,4 +809,19 @@ public class MapJoinBytesTableContainer
   public int size() {
     return hashMap.size();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += hashMap.getEstimatedMemorySize();
+    size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize();
+    size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize();
+    size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length);
+    size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length);
+    size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length);
+    size += jdm.arrayList(); // empty list
+    size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 6d71fef..5ca5ff6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 
-public interface MapJoinTableContainer {
+public interface MapJoinTableContainer extends MemoryEstimate {
   /**
    * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures
    * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7b13e90..7011d23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -146,7 +147,20 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       }
       nwayConf.setNumberOfPartitions(numPartitions);
     }
-
+    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    long numEntries = 0;
+    long noCondTaskSize = desc.getNoConditionalTaskSize();
+    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+    if (!doMemCheck) {
+      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+        "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+    } else {
+      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+        noCondTaskSize, inflationFactor);
+    }
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -205,12 +219,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
           tableContainer = new HashMapWrapper(hconf, keyCount);
         }
 
-        LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName());
+        LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName());
 
         tableContainer.setSerde(keyCtx, valCtx);
         while (kvReader.next()) {
-          tableContainer.putRow(
-              (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue());
+          tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            final long threshold = (long) (inflationFactor * noCondTaskSize);
+            // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+            // available for container/executor
+            final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits." +
+                " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+                " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+                " effectiveThreshold: " + effectiveThreshold;
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " +
+                  "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+              }
+            }
+          }
         }
         tableContainer.seal();
         LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 486d43a..4242262 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+import com.google.common.base.Throwables;
+
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -189,8 +193,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor {
     } catch (Throwable t) {
       originalThrowable = t;
     } finally {
-      if (originalThrowable != null && originalThrowable instanceof Error) {
-        LOG.error(StringUtils.stringifyException(originalThrowable));
+      if (originalThrowable != null && (originalThrowable instanceof Error ||
+        Throwables.getRootCause(originalThrowable) instanceof Error)) {
+        LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable));
+        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
+                      "Cannot recover from this error");
         throw new RuntimeException(originalThrowable);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
index 630046d..84128e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferBatch.java
@@ -57,7 +57,7 @@ public class VectorAggregationBufferBatch {
   /**
    * Memory consumed by a set of aggregation buffers
    */
-  private int aggregatorsFixedSize;
+  private long aggregatorsFixedSize;
 
   /**
    * Array of indexes for aggregators that have variable size
@@ -76,7 +76,7 @@ public class VectorAggregationBufferBatch {
    * Returns the fixed size consumed by the aggregation buffers
    * @return
    */
-  public int getAggregatorsFixedSize() {
+  public long getAggregatorsFixedSize() {
     return aggregatorsFixedSize;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 5b4c7c3..30916a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -286,7 +286,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
     /**
      * Total per hashtable entry fixed memory (does not depend on key/agg values).
      */
-    private int fixedHashEntrySize;
+    private long fixedHashEntrySize;
 
     /**
      * Average per hashtable entry variable size memory (depends on key/agg value).

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
index 0866f63..7ab4473 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorAggregateExpression.java
@@ -52,7 +52,7 @@ public abstract class VectorAggregateExpression  implements Serializable {
   public abstract Object evaluateOutput(AggregationBuffer agg) throws HiveException;
 
   public abstract ObjectInspector getOutputObjectInspector();
-  public abstract int getAggregationBufferFixedSize();
+  public abstract long getAggregationBufferFixedSize();
   public boolean hasVariableSize() {
     return false;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
index 74e25ae..4aac9d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
@@ -492,7 +492,7 @@ public class VectorUDAFAvgDecimal extends VectorAggregateExpression {
   }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
index 483d9dc..365dcf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgTimestamp.java
@@ -464,7 +464,7 @@ public class VectorUDAFAvgTimestamp extends VectorAggregateExpression {
   }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
     JavaDataModel model = JavaDataModel.get();
     return JavaDataModel.alignUp(
       model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
index 2139eae..52b05ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilter.java
@@ -383,7 +383,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression {
   }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
     if (bitSetSize < 0) {
       // Not pretty, but we need a way to get the size
       try {
@@ -396,7 +396,7 @@ public class VectorUDAFBloomFilter extends VectorAggregateExpression {
 
     // BloomFilter: object(BitSet: object(data: long[]), numBits: int, numHashFunctions: int)
     JavaDataModel model = JavaDataModel.get();
-    int bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
+    long bloomFilterSize = JavaDataModel.alignUp(model.object() + model.lengthForLongArrayOfSize(bitSetSize),
         model.memoryAlign());
     return JavaDataModel.alignUp(
         model.object() + bloomFilterSize + model.primitive1() + model.primitive1(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
index d2446d5..b986eb4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFBloomFilterMerge.java
@@ -339,7 +339,7 @@ public class VectorUDAFBloomFilterMerge extends VectorAggregateExpression {
   }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
     if (aggBufferSize < 0) {
       // Not pretty, but we need a way to get the size
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
index 494febc..cadb6dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCount.java
@@ -259,7 +259,7 @@ public class VectorUDAFCount extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
index dec88cb..c489f8f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountMerge.java
@@ -385,7 +385,7 @@ public class VectorUDAFCountMerge extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
index 337ba0a..3b66030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFCountStar.java
@@ -142,7 +142,7 @@ public class VectorUDAFCountStar extends VectorAggregateExpression {
     }
 
     @Override
-    public int getAggregationBufferFixedSize() {
+    public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
index 8cd3506..5388d37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdPopTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFStdPopTimestamp extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
index 61d6977..1769dc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFStdSampTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFStdSampTimestamp extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
index b10f66f..a37e3f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
@@ -431,7 +431,7 @@ public class VectorUDAFSumDecimal extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object(),

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
index 2709b07..61cdeaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarPopTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFVarPopTimestamp extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
index 03dce1e..c375461 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFVarSampTimestamp.java
@@ -508,7 +508,7 @@ public class VectorUDAFVarSampTimestamp extends VectorAggregateExpression {
     }
 
   @Override
-  public int getAggregationBufferFixedSize() {
+  public long getAggregationBufferFixedSize() {
       JavaDataModel model = JavaDataModel.get();
       return JavaDataModel.alignUp(
         model.object() +

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
index 6242daf..b5eab8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMap.java
@@ -107,4 +107,9 @@ public abstract class VectorMapJoinFastBytesHashMap
     // Share the same write buffers with our value store.
     keyStore = new VectorMapJoinFastKeyStore(valueStore.writeBuffers());
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
index 1a41961..e779762 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashMultiSet.java
@@ -97,4 +97,9 @@ public abstract class VectorMapJoinFastBytesHashMultiSet
 
     keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
index 331867c..d493319 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashSet.java
@@ -84,4 +84,9 @@ public abstract class VectorMapJoinFastBytesHashSet
 
     keyStore = new VectorMapJoinFastKeyStore(writeBuffersSize);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + keyStore.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
index b93e977..10bc902 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastBytesHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashTable;
@@ -218,4 +219,9 @@ public abstract class VectorMapJoinFastBytesHashTable
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     allocateBucketArray();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + JavaDataModel.get().lengthForLongArrayOfSize(slotTriples.length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index b6db3bc..1f182ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
@@ -88,4 +89,10 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
   public int size() {
     return keysAssigned;
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    return JavaDataModel.alignUp(10L * jdm.primitive1() + jdm.primitive2(), jdm.memoryAlign());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index 49ecdd1..b015e43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.exec.tez.TezContext;
@@ -68,6 +68,21 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
+    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    long numEntries = 0;
+    long noCondTaskSize = desc.getNoConditionalTaskSize();
+    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+    if (!doMemCheck) {
+      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
+          "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
+    } else {
+      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
+        noCondTaskSize, inflationFactor);
+    }
+
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -93,15 +108,41 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
                 new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
 
+        LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
         while (kvReader.next()) {
           vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
               (BytesWritable)kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && numEntries >= memoryCheckInterval) {
+            if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+              final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
+              final long threshold = (long) (inflationFactor * noCondTaskSize);
+              // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+              // available for container/executor
+              final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
+              if (estMemUsage > effectiveThreshold) {
+                String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
+                  " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
+                  " inflationFactor: " + inflationFactor + " threshold: " + threshold +
+                  " effectiveThreshold: " + effectiveThreshold;
+                LOG.error(msg);
+                throw new MapJoinMemoryExhaustionError(msg);
+              } else {
+                if (LOG.isInfoEnabled()) {
+                  LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
+                    "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+                }
+              }
+            }
+          }
         }
 
         vectorMapJoinFastTableContainer.seal();
-        mapJoinTables[pos] = (MapJoinTableContainer) vectorMapJoinFastTableContainer;
-
+        mapJoinTables[pos] = vectorMapJoinFastTableContainer;
+        LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
+          ". Small table position: " + pos);
       } catch (IOException e) {
         throw new HiveException(e);
       } catch (SerDeException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
index be51693..3e9ff84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastKeyStore.java
@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 
 // Optimized for sequential key lookup.
 
-public class VectorMapJoinFastKeyStore {
+public class VectorMapJoinFastKeyStore implements MemoryEstimate {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastKeyStore.class.getName());
 
@@ -165,4 +166,12 @@ public class VectorMapJoinFastKeyStore {
     this.writeBuffers = writeBuffers;
     unsafeReadPos = new WriteBuffers.Position();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    size += writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+    size += unsafeReadPos == null ? 0 : unsafeReadPos.getEstimatedMemorySize();
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
index 6fe98f9..d4847b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMap.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -37,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class VectorMapJoinFastLongHashMap
              extends VectorMapJoinFastLongHashTable
-             implements VectorMapJoinLongHashMap {
+             implements VectorMapJoinLongHashMap, MemoryEstimate {
 
   public static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastLongHashMap.class);
 
@@ -112,4 +114,9 @@ public class VectorMapJoinFastLongHashMap
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     valueStore = new VectorMapJoinFastValueStore(writeBuffersSize);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize() + valueStore.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
index 9140aee..566cfa2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashMultiSet.java
@@ -100,4 +100,9 @@ public class VectorMapJoinFastLongHashMultiSet
     super(minMaxEnabled, isOuterJoin, hashTableKeyType,
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
index d3efb11..fb7ae62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashSet.java
@@ -96,4 +96,9 @@ public class VectorMapJoinFastLongHashSet
     super(minMaxEnabled, isOuterJoin, hashTableKeyType,
         initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 8bfa07c..54e667c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -280,4 +281,18 @@ public abstract class VectorMapJoinFastLongHashTable
     min = Long.MAX_VALUE;
     max = Long.MIN_VALUE;
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = super.getEstimatedMemorySize();
+    size += slotPairs == null ? 0 : jdm.lengthForLongArrayOfSize(slotPairs.length);
+    size += (2 * jdm.primitive2());
+    size += (2 * jdm.primitive1());
+    size += jdm.object();
+    // adding 16KB constant memory for keyBinarySortableDeserializeRead as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    size += (16 * 1024L);
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
index add4788..eb08aa9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMap.java
@@ -53,4 +53,9 @@ public class VectorMapJoinFastMultiKeyHashMap
         int initialCapacity, float loadFactor, int writeBuffersSize, long estimatedKeyCount) {
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
index faefdbb..56964bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashMultiSet.java
@@ -52,4 +52,8 @@ public class VectorMapJoinFastMultiKeyHashMultiSet
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
index 5328910..46bafe0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastMultiKeyHashSet.java
@@ -52,5 +52,8 @@ public class VectorMapJoinFastMultiKeyHashSet
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
   }
 
-
+  @Override
+  public long getEstimatedMemorySize() {
+    return super.getEstimatedMemorySize();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
index f13034f..d04590a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMap.java
@@ -43,4 +43,13 @@ public class VectorMapJoinFastStringHashMap extends VectorMapJoinFastBytesHashMa
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    long size = 0;
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    size += (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
index 53ad7b4..b24bfdf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashMultiSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashMultiSet extends VectorMapJoinFastBytesH
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    long size = (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
index 723c729..75fae25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringHashSet.java
@@ -43,4 +43,12 @@ public class VectorMapJoinFastStringHashSet extends VectorMapJoinFastBytesHashSe
     super(initialCapacity, loadFactor, writeBuffersSize, estimatedKeyCount);
     stringCommon = new VectorMapJoinFastStringCommon(isOuterJoin);
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    // adding 16KB constant memory for stringCommon as the rabit hole is deep to implement
+    // MemoryEstimate interface, also it is constant overhead
+    long size = (16 * 1024L);
+    return super.getEstimatedMemorySize() + size;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 05f1cf1..2fe4b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -26,7 +27,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
-import org.apache.hadoop.hive.ql.exec.tez.HashTableLoader;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * HashTableLoader for Tez constructs the hashtable from records read from
@@ -46,7 +45,7 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
  */
 public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContainer {
 
-  private static final Logger LOG = LoggerFactory.getLogger(HashTableLoader.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastTableContainer.class.getName());
 
   private final MapJoinDesc desc;
   private final Configuration hconf;
@@ -219,6 +218,17 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
   }
 
   @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += vectorMapJoinFastHashTable.getEstimatedMemorySize();
+    size += (4 * jdm.primitive1());
+    size += (2 * jdm.object());
+    size += (jdm.primitive2());
+    return size;
+  }
+
+  @Override
   public void setSerde(MapJoinObjectSerDeContext keyCtx, MapJoinObjectSerDeContext valCtx)
       throws SerDeException {
     // Do nothing in this case.

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
index f9c5b34..3cd06e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastValueStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult;
@@ -30,7 +31,7 @@ import com.google.common.base.Preconditions;
 
 // Supports random access.
 
-public class VectorMapJoinFastValueStore {
+public class VectorMapJoinFastValueStore implements MemoryEstimate {
 
   private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinFastValueStore.class.getName());
 
@@ -113,6 +114,11 @@ public class VectorMapJoinFastValueStore {
     return writeBuffers;
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return writeBuffers == null ? 0 : writeBuffers.getEstimatedMemorySize();
+  }
+
   public static class HashMapResult extends VectorMapJoinHashMapResult {
 
     private VectorMapJoinFastValueStore valueStore;

http://git-wip-us.apache.org/repos/asf/hive/blob/0ffff404/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index c7e585c..9cc9ad4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.BytesWritable;
@@ -28,7 +29,7 @@ import org.apache.hadoop.io.BytesWritable;
  * Root interface for a vector map join hash table (which could be a hash map, hash multi-set, or
  * hash set).
  */
-public interface VectorMapJoinHashTable {
+public interface VectorMapJoinHashTable extends MemoryEstimate {
 
 
   /*