You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/05/17 06:14:37 UTC

hive git commit: HIVE-16582: HashTableLoader should log info about the input, rows, size etc. (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 045b8da50 -> 5a0b42537


HIVE-16582: HashTableLoader should log info about the input, rows, size etc. (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5a0b4253
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5a0b4253
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5a0b4253

Branch: refs/heads/master
Commit: 5a0b42537f97e76430bcf0a62c9c6fa26c4b1e01
Parents: 045b8da
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 16 23:14:26 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 16 23:14:26 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |   4 +
 .../hadoop/hive/ql/exec/MemoryMonitorInfo.java  | 166 +++++++++++++++++++
 .../hive/ql/exec/SerializationUtilities.java    |   1 +
 .../hive/ql/exec/tez/HashTableLoader.java       |  68 +++++---
 .../fast/VectorMapJoinFastHashTableLoader.java  |  73 ++++----
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  48 +++---
 .../hive/ql/optimizer/MapJoinProcessor.java     |   4 +-
 .../hive/ql/optimizer/SharedScanOptimizer.java  |   4 +-
 .../calcite/translator/HiveOpConverter.java     |   2 +-
 .../physical/GenMRSkewJoinProcessor.java        |   2 +-
 .../physical/GenSparkSkewJoinProcessor.java     |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   2 +-
 .../apache/hadoop/hive/ql/plan/JoinDesc.java    |  23 +--
 .../apache/hadoop/hive/ql/plan/MapJoinDesc.java |   5 +-
 .../hadoop/hive/ql/exec/TestOperators.java      |  57 ++-----
 .../clientpositive/llap/tez_smb_main.q.out      |  18 +-
 16 files changed, 328 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 07aa2ea..4971707 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -132,6 +132,10 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
     return HashTableLoaderFactory.getLoader(hconf);
   }
 
+  public String getCacheKey() {
+    return cacheKey;
+  }
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     this.hconf = hconf;

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
new file mode 100644
index 0000000..24c5799
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+
+/**
+ * Contains information required for memory usage monitoring.
+ **/
+
+public class MemoryMonitorInfo implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  // Variables for LLAP hash table loading memory monitor
+  private boolean isLlap;
+  private int executorsPerNode;
+  private int maxExecutorsOverSubscribeMemory;
+  private double memoryOverSubscriptionFactor;
+  private long noConditionalTaskSize;
+  private long adjustedNoConditionalTaskSize;
+  private long memoryCheckInterval;
+  private double hashTableInflationFactor;
+  private long threshold;
+
+  public MemoryMonitorInfo() {
+  }
+
+  public MemoryMonitorInfo(boolean isLlap, int executorsPerNode, int maxExecutorsOverSubscribeMemory,
+    double memoryOverSubscriptionFactor, long noConditionalTaskSize, long adjustedNoConditionalTaskSize,
+    long memoryCheckInterval, double hashTableInflationFactor) {
+    this.isLlap = isLlap;
+    this.executorsPerNode = executorsPerNode;
+    this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory;
+    this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor;
+    this.noConditionalTaskSize = noConditionalTaskSize;
+    this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize;
+    this.memoryCheckInterval = memoryCheckInterval;
+    this.hashTableInflationFactor = hashTableInflationFactor;
+    this.threshold = (long) (hashTableInflationFactor * adjustedNoConditionalTaskSize);
+  }
+
+  public MemoryMonitorInfo(MemoryMonitorInfo memoryMonitorInfo) {
+    this.isLlap = memoryMonitorInfo.isLlap;
+    this.executorsPerNode = memoryMonitorInfo.executorsPerNode;
+    this.maxExecutorsOverSubscribeMemory = memoryMonitorInfo.maxExecutorsOverSubscribeMemory;
+    this.memoryOverSubscriptionFactor = memoryMonitorInfo.memoryOverSubscriptionFactor;
+    this.noConditionalTaskSize = memoryMonitorInfo.noConditionalTaskSize;
+    this.adjustedNoConditionalTaskSize = memoryMonitorInfo.adjustedNoConditionalTaskSize;
+    this.memoryCheckInterval = memoryMonitorInfo.memoryCheckInterval;
+    this.hashTableInflationFactor = memoryMonitorInfo.hashTableInflationFactor;
+    this.threshold = memoryMonitorInfo.threshold;
+  }
+
+  public int getExecutorsPerNode() {
+    return executorsPerNode;
+  }
+
+  public void setExecutorsPerNode(final int executorsPerNode) {
+    this.executorsPerNode = executorsPerNode;
+  }
+
+  public int getMaxExecutorsOverSubscribeMemory() {
+    return maxExecutorsOverSubscribeMemory;
+  }
+
+  public void setMaxExecutorsOverSubscribeMemory(final int maxExecutorsOverSubscribeMemory) {
+    this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory;
+  }
+
+  public double getMemoryOverSubscriptionFactor() {
+    return memoryOverSubscriptionFactor;
+  }
+
+  public void setMemoryOverSubscriptionFactor(final double memoryOverSubscriptionFactor) {
+    this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor;
+  }
+
+  public long getNoConditionalTaskSize() {
+    return noConditionalTaskSize;
+  }
+
+  public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
+    this.noConditionalTaskSize = noConditionalTaskSize;
+  }
+
+  public long getAdjustedNoConditionalTaskSize() {
+    return adjustedNoConditionalTaskSize;
+  }
+
+  public void setAdjustedNoConditionalTaskSize(final long adjustedNoConditionalTaskSize) {
+    this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize;
+  }
+
+  public long getMemoryCheckInterval() {
+    return memoryCheckInterval;
+  }
+
+  public void setMemoryCheckInterval(final long memoryCheckInterval) {
+    this.memoryCheckInterval = memoryCheckInterval;
+  }
+
+  public double getHashTableInflationFactor() {
+    return hashTableInflationFactor;
+  }
+
+  public void setHashTableInflationFactor(final double hashTableInflationFactor) {
+    this.hashTableInflationFactor = hashTableInflationFactor;
+  }
+
+  public long getThreshold() {
+    return threshold;
+  }
+
+  public void setLlap(final boolean llap) {
+    isLlap = llap;
+  }
+
+  public boolean isLlap() {
+    return isLlap;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    sb.append(" isLlap: ").append(isLlap);
+    sb.append(" executorsPerNode: ").append(executorsPerNode);
+    sb.append(" maxExecutorsOverSubscribeMemory: ").append(maxExecutorsOverSubscribeMemory);
+    sb.append(" memoryOverSubscriptionFactor: ").append(memoryOverSubscriptionFactor);
+    sb.append(" memoryCheckInterval: ").append(memoryCheckInterval);
+    sb.append(" noConditionalTaskSize: ").append(noConditionalTaskSize);
+    sb.append(" adjustedNoConditionalTaskSize: ").append(adjustedNoConditionalTaskSize);
+    sb.append(" hashTableInflationFactor: ").append(hashTableInflationFactor);
+    sb.append(" threshold: ").append(threshold);
+    sb.append(" }");
+    return sb.toString();
+  }
+
+  public boolean doMemoryMonitoring() {
+    return isLlap && hashTableInflationFactor > 0.0d && noConditionalTaskSize > 0 &&
+      memoryCheckInterval > 0;
+  }
+
+  public long getEffectiveThreshold(final long maxMemoryPerExecutor) {
+    // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+    // available for container/executor
+    return (long) Math.max(threshold, (2.0 / 3.0) * maxMemoryPerExecutor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index a29dd85..8902f6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -250,6 +250,7 @@ public class SerializationUtilities {
       kryo.register(SparkEdgeProperty.class);
       kryo.register(SparkWork.class);
       kryo.register(Pair.class);
+      kryo.register(MemoryMonitorInfo.class);
 
       // This must be called after all the explicit register calls.
       return kryo.processHooks(kryoTypeHooks, globalHook);

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7011d23..5bb9d7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
   private Configuration hconf;
   private MapJoinDesc desc;
   private TezContext tezContext;
+  private String cacheKey;
 
   @Override
   public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
@@ -70,6 +73,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
     this.tezContext = (TezContext) mrContext;
     this.hconf = hconf;
     this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
   }
 
   @Override
@@ -147,25 +151,36 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       }
       nwayConf.setNumberOfPartitions(numPartitions);
     }
-    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
-    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
-      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
-    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
-    long numEntries = 0;
-    long noCondTaskSize = desc.getNoConditionalTaskSize();
-    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }
+      }
+    }
+
     if (!doMemCheck) {
-      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
-        "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
-    } else {
-      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
-        noCondTaskSize, inflationFactor);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+      }
     }
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
       }
 
+      long numEntries = 0;
       String inputName = parentToInput.get(pos);
       LogicalInput input = tezContext.getInput(inputName);
 
@@ -219,36 +234,39 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
           tableContainer = new HashMapWrapper(hconf, keyCount);
         }
 
-        LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName());
+        LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName,
+          cacheKey, tableContainer.getClass().getSimpleName(), pos);
 
         tableContainer.setSerde(keyCtx, valCtx);
         while (kvReader.next()) {
           tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue());
           numEntries++;
-          if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
             final long estMemUsage = tableContainer.getEstimatedMemorySize();
-            final long threshold = (long) (inflationFactor * noCondTaskSize);
-            // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
-            // available for container/executor
-            final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
             if (estMemUsage > effectiveThreshold) {
-              String msg = "Hash table loading exceeded memory limits." +
-                " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
-                " inflationFactor: " + inflationFactor + " threshold: " + threshold +
-                " effectiveThreshold: " + effectiveThreshold;
+              String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+                " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
               LOG.error(msg);
               throw new MapJoinMemoryExhaustionError(msg);
             } else {
               if (LOG.isInfoEnabled()) {
-                LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " +
-                  "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+                LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                  "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+                  effectiveThreshold);
               }
             }
           }
         }
         tableContainer.seal();
-        LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos);
         mapJoinTables[pos] = tableContainer;
+        if (doMemCheck) {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}",
+            inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize());
+        } else {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey,
+            numEntries);
+        }
       } catch (Exception e) {
         throw new HiveException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index b015e43..6c1ae2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +53,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
   private Configuration hconf;
   protected MapJoinDesc desc;
   private TezContext tezContext;
+  private String cacheKey;
 
   @Override
   public void init(ExecMapperContext context, MapredContext mrContext,
@@ -58,6 +61,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
     this.tezContext = (TezContext) mrContext;
     this.hconf = hconf;
     this.desc = joinOp.getConf();
+    this.cacheKey = joinOp.getCacheKey();
   }
 
   @Override
@@ -68,26 +72,36 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
     Map<Integer, String> parentToInput = desc.getParentToInput();
     Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
 
-    final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
-    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
-      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
-    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
-    long numEntries = 0;
-    long noCondTaskSize = desc.getNoConditionalTaskSize();
-    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
-    if (!doMemCheck) {
-      LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
-          "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
-    } else {
-      LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
-        noCondTaskSize, inflationFactor);
+    MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+    boolean doMemCheck = false;
+    long effectiveThreshold = 0;
+    if (memoryMonitorInfo != null) {
+      effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+      // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+      // Flip the flag at runtime in case if we are running outside of LLAP
+      if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+        memoryMonitorInfo.setLlap(false);
+      }
+      if (memoryMonitorInfo.doMemoryMonitoring()) {
+        doMemCheck = true;
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+        }
+      }
     }
 
+    if (!doMemCheck) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+      }
+    }
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
       }
 
+      long numEntries = 0;
       String inputName = parentToInput.get(pos);
       LogicalInput input = tezContext.getInput(inputName);
 
@@ -108,41 +122,42 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
         VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
                 new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
 
-        LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+        LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName,
+          cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos);
 
         vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
         while (kvReader.next()) {
           vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
               (BytesWritable)kvReader.getCurrentValue());
           numEntries++;
-          if (doMemCheck && numEntries >= memoryCheckInterval) {
-            if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+          if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
               final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
-              final long threshold = (long) (inflationFactor * noCondTaskSize);
-              // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
-              // available for container/executor
-              final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
               if (estMemUsage > effectiveThreshold) {
-                String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
-                  " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
-                  " inflationFactor: " + inflationFactor + " threshold: " + threshold +
-                  " effectiveThreshold: " + effectiveThreshold;
+                String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+                  " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+                  " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
                 LOG.error(msg);
                 throw new MapJoinMemoryExhaustionError(msg);
               } else {
                 if (LOG.isInfoEnabled()) {
-                  LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
-                    "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+                  LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+                      "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+                    effectiveThreshold);
                 }
               }
-            }
           }
         }
 
         vectorMapJoinFastTableContainer.seal();
         mapJoinTables[pos] = vectorMapJoinFastTableContainer;
-        LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
-          ". Small table position: " + pos);
+        if (doMemCheck) {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " +
+              "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries,
+            vectorMapJoinFastTableContainer.getEstimatedMemorySize());
+        } else {
+          LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey,
+            numEntries);
+        }
       } catch (IOException e) {
         throw new HiveException(e);
       } catch (SerDeException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 0eec78e..98fec77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -97,10 +98,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     JoinOperator joinOp = (JoinOperator) nd;
     long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
     // adjust noconditional task size threshold for LLAP
-    maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
-    joinOp.getConf().setNoConditionalTaskSize(maxSize);
+    MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf);
+    joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
 
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
@@ -172,7 +172,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   }
 
   @VisibleForTesting
-  public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
+  public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize, final HiveConf conf) {
+    final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+    final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
+    final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+    final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+    final MemoryMonitorInfo memoryMonitorInfo;
     if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
       LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
       llapInfo.initClusterInfo();
@@ -190,24 +195,23 @@ public class ConvertJoinMapJoin implements NodeProcessor {
           executorsPerNode = numExecutorsPerNodeFromCluster;
         }
       }
-      final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
-      if (numSessions > 0) {
-        final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
-        final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
-        final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
-        final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
-        final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
-        LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
-            "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
-            "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
-          availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
-        return Math.max(maxSize, llapMaxSize);
-      } else {
-        LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
-          " as no conditional task size for LLAP.", numSessions, maxSize);
-      }
+
+      // bounded by max executors
+      final int slotsPerQuery = Math.min(maxSlotsPerQuery, executorsPerNode);
+      final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
+      // prevents under subscription
+      final long adjustedMaxSize = Math.max(maxSize, llapMaxSize);
+      memoryMonitorInfo = new MemoryMonitorInfo(true, executorsPerNode, maxSlotsPerQuery,
+        overSubscriptionFactor, maxSize, adjustedMaxSize, memoryCheckInterval, inflationFactor);
+    } else {
+      // for non-LLAP mode most of these are not relevant. Only noConditionalTaskSize is used by shared scan optimizer.
+      memoryMonitorInfo = new MemoryMonitorInfo(false, 1, maxSlotsPerQuery, overSubscriptionFactor, maxSize, maxSize,
+        memoryCheckInterval, inflationFactor);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Memory monitor info set to : {}", memoryMonitorInfo);
     }
-    return maxSize;
+    return memoryMonitorInfo;
   }
 
   @SuppressWarnings("unchecked")
@@ -282,7 +286,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
                   null, joinDesc.getExprs(), null, null,
                   joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
                   joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null,
-                  joinDesc.getNoConditionalTaskSize(), joinDesc.getInMemoryDataSize());
+                  joinDesc.getMemoryMonitorInfo(), joinDesc.getInMemoryDataSize());
       mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
       mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
       mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index f01fb9c..d84a1e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -432,7 +432,7 @@ public class MapJoinProcessor extends Transform {
         smbJoinDesc.getOutputColumnNames(),
         bigTablePos, smbJoinDesc.getConds(),
         smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(),
-        smbJoinDesc.getNoConditionalTaskSize(), smbJoinDesc.getInMemoryDataSize());
+        smbJoinDesc.getMemoryMonitorInfo(), smbJoinDesc.getInMemoryDataSize());
 
     mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
 
@@ -1186,7 +1186,7 @@ public class MapJoinProcessor extends Transform {
         new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
             valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters,
             op.getConf().getNoOuterJoin(), dumpFilePrefix,
-            op.getConf().getNoConditionalTaskSize(), op.getConf().getInMemoryDataSize());
+            op.getConf().getMemoryMonitorInfo(), op.getConf().getInMemoryDataSize());
     mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
     mapJoinDescriptor.setTagOrder(tagOrder);
     mapJoinDescriptor.setNullSafes(desc.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
index 5964fd4..3349fc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
@@ -402,10 +402,10 @@ public class SharedScanOptimizer extends Transform {
       if (op instanceof MapJoinOperator) {
         MapJoinOperator mop = (MapJoinOperator) op;
         dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize());
-        if (dataSize > mop.getConf().getNoConditionalTaskSize()) {
+        if (dataSize > mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize()) {
           // Size surpasses limit, we cannot convert
           LOG.debug("accumulated data size: {} / max size: {}",
-                  dataSize, mop.getConf().getNoConditionalTaskSize());
+                  dataSize, mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize());
           return false;
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
index b9b600d..471675b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
@@ -1003,7 +1003,7 @@ public class HiveOpConverter {
 
     // 4. We create the join operator with its descriptor
     JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns,
-            filters, joinExpressions, 0);
+            filters, joinExpressions, null);
     desc.setReversedExprs(reversedExprs);
     desc.setFilterMap(filterMap);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 53abb21..e0ccd04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -282,7 +282,7 @@ public final class GenMRSkewJoinProcessor {
           newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
-          joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
+          joinDescriptor.getMemoryMonitorInfo(), joinDescriptor.getInMemoryDataSize());
       mapJoinDescriptor.setTagOrder(tags);
       mapJoinDescriptor.setHandleSkewJoin(false);
       mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index a5f0b2a..6b9d5b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -241,7 +241,7 @@ public class GenSparkSkewJoinProcessor {
           newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
-          joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
+          joinDescriptor.getMemoryMonitorInfo(), joinDescriptor.getInMemoryDataSize());
       mapJoinDescriptor.setTagOrder(tags);
       mapJoinDescriptor.setHandleSkewJoin(false);
       mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index c6b67d1..eb7ef00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8125,7 +8125,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     JoinDesc desc = new JoinDesc(exprMap, outputColumnNames,
-        join.getNoOuterJoin(), joinCondns, filterMap, joinKeys, 0);
+        join.getNoOuterJoin(), joinCondns, filterMap, joinKeys, null);
     desc.setReversedExprs(reversedExprs);
     desc.setFilterMap(join.getFilterMap());
     // For outer joins, add filters that apply to more than one input

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 12e1ff5..eae80a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -111,7 +112,7 @@ public class JoinDesc extends AbstractOperatorDesc {
   protected transient long inMemoryDataSize;
 
   // non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP
-  protected long noConditionalTaskSize;
+  protected MemoryMonitorInfo memoryMonitorInfo;
 
   public JoinDesc() {
   }
@@ -119,14 +120,14 @@ public class JoinDesc extends AbstractOperatorDesc {
   public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
       List<String> outputColumnNames, final boolean noOuterJoin,
       final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters,
-      ExprNodeDesc[][] joinKeys, final long noConditionalTaskSize) {
+      ExprNodeDesc[][] joinKeys, final MemoryMonitorInfo memoryMonitorInfo) {
     this.exprs = exprs;
     this.outputColumnNames = outputColumnNames;
     this.noOuterJoin = noOuterJoin;
     this.conds = conds;
     this.filters = filters;
     this.joinKeys = joinKeys;
-    this.noConditionalTaskSize = noConditionalTaskSize;
+    this.memoryMonitorInfo = memoryMonitorInfo;
     resetOrder();
   }
 
@@ -153,7 +154,9 @@ public class JoinDesc extends AbstractOperatorDesc {
     ret.setHandleSkewJoin(handleSkewJoin);
     ret.setSkewKeyDefinition(getSkewKeyDefinition());
     ret.setTagOrder(getTagOrder().clone());
-    ret.setNoConditionalTaskSize(getNoConditionalTaskSize());
+    if (getMemoryMonitorInfo() != null) {
+      ret.setMemoryMonitorInfo(new MemoryMonitorInfo(getMemoryMonitorInfo()));
+    }
     if (getKeyTableDesc() != null) {
       ret.setKeyTableDesc((TableDesc) getKeyTableDesc().clone());
     }
@@ -204,8 +207,8 @@ public class JoinDesc extends AbstractOperatorDesc {
     this.filterMap = clone.filterMap;
     this.residualFilterExprs = clone.residualFilterExprs;
     this.statistics = clone.statistics;
-    this.noConditionalTaskSize = clone.noConditionalTaskSize;
     this.inMemoryDataSize = clone.inMemoryDataSize;
+    this.memoryMonitorInfo = clone.memoryMonitorInfo;
   }
 
   public Map<Byte, List<ExprNodeDesc>> getExprs() {
@@ -691,14 +694,12 @@ public class JoinDesc extends AbstractOperatorDesc {
     streamAliases = joinDesc.streamAliases == null ? null : new ArrayList<String>(joinDesc.streamAliases);
   }
 
-  private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(JoinDesc.class);
-
-  public long getNoConditionalTaskSize() {
-    return noConditionalTaskSize;
+  public MemoryMonitorInfo getMemoryMonitorInfo() {
+    return memoryMonitorInfo;
   }
 
-  public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
-    this.noConditionalTaskSize = noConditionalTaskSize;
+  public void setMemoryMonitorInfo(final MemoryMonitorInfo memoryMonitorInfo) {
+    this.memoryMonitorInfo = memoryMonitorInfo;
   }
 
   public long getInMemoryDataSize() {

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index f387e6a..0d8e459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
 import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
@@ -113,8 +114,8 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
     final List<TableDesc> valueTblDescs, final List<TableDesc> valueFilteredTblDescs, List<String> outputColumnNames,
     final int posBigTable, final JoinCondDesc[] conds,
     final Map<Byte, List<ExprNodeDesc>> filters, boolean noOuterJoin, String dumpFilePrefix,
-    final long noConditionalTaskSize, final long inMemoryDataSize) {
-    super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize);
+    final MemoryMonitorInfo memoryMonitorInfo, final long inMemoryDataSize) {
+    super(values, outputColumnNames, noOuterJoin, conds, filters, null, memoryMonitorInfo);
     vectorDesc = null;
     this.keys = keys;
     this.keyTblDesc = keyTblDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 0287ff2..f62ba34 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -446,9 +446,9 @@ public class TestOperators extends TestCase {
     long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L;
     HiveConf hiveConf = new HiveConf();
 
-    // execution mode not set, default is returned
-    long gotSize = convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf);
-    assertEquals(defaultNoConditionalTaskSize, gotSize);
+    // execution mode not set, null is returned
+    assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize,
+      hiveConf).getAdjustedNoConditionalTaskSize());
     hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
 
     // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed
@@ -457,7 +457,8 @@ public class TestOperators extends TestCase {
     int maxSlots = 3;
     long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots));
     assertEquals(expectedSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf)
+        .getAdjustedNoConditionalTaskSize());
 
     // num executors is less than max executors per query (which is not expected case), default executors will be
     // chosen. 4 * 20% of noconditional task size will be oversubscribed
@@ -465,40 +466,18 @@ public class TestOperators extends TestCase {
     hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5");
     expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots));
     assertEquals(expectedSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
-    // 2 concurrent sessions, 4 executors. 2 * 20% of noconditional task size will be oversubscribed
-    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
-    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
-    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
-    assertEquals(expectedSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
-    // 4 concurrent sessions, 4 executors. 1 * 20% of noconditional task size will be oversubscribed
-    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
-    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "4");
-    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 1));
-    assertEquals(expectedSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
-    // 8 concurrent sessions, 4 executors. default noconditioanl task will be used (no oversubscription)
-    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
-    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "8");
-    assertEquals(defaultNoConditionalTaskSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
-    // 2 * 120% of noconditional task size will be oversubscribed
-    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
-    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
-    hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "1.2");
-    fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
-    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
-    assertEquals(expectedSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
-    // 0 value for number of sessions
-    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "0");
-    assertEquals(defaultNoConditionalTaskSize,
-      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf)
+        .getAdjustedNoConditionalTaskSize());
+
+    // disable memory checking
+    hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0");
+    assertFalse(
+      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring());
+
+    // invalid inflation factor
+    hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000");
+    hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f");
+    assertFalse(
+      convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring());
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
index 4f9c95a..66d7aec 100644
--- a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
@@ -692,11 +692,11 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE)
+        Map 2 <- Map 4 (BROADCAST_EDGE)
         Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: a
@@ -708,15 +708,6 @@ STAGE PLANS:
                       expressions: key (type: int), value (type: string)
                       outputColumnNames: _col0, _col1
                       Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: int)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: int)
-                        Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: string)
-            Execution mode: llap
-            LLAP IO: no inputs
-        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: b
@@ -728,15 +719,13 @@ STAGE PLANS:
                       expressions: key (type: int)
                       outputColumnNames: _col0
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Map Join Operator
+                      Merge Join Operator
                         condition map:
                              Inner Join 0 to 1
                         keys:
                           0 _col0 (type: int)
                           1 _col0 (type: int)
                         outputColumnNames: _col1
-                        input vertices:
-                          0 Map 1
                         Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
                         Map Join Operator
                           condition map:
@@ -757,7 +746,6 @@ STAGE PLANS:
                               Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                               value expressions: _col0 (type: bigint)
             Execution mode: llap
-            LLAP IO: no inputs
         Map 4 
             Map Operator Tree:
                 TableScan