You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/04/26 17:31:08 UTC

hive git commit: HIVE-16503: LLAP: Oversubscribe memory for noconditional task size

Repository: hive
Updated Branches:
  refs/heads/master b271bcb7c -> 17b1110fa


HIVE-16503: LLAP: Oversubscribe memory for noconditional task size


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17b1110f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17b1110f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17b1110f

Branch: refs/heads/master
Commit: 17b1110fa8c01f2e9283546ea205d9ae2bdc13bc
Parents: b271bcb
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Apr 26 10:30:54 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Apr 26 10:30:54 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   9 +
 ql/pom.xml                                      |   7 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  74 +++-
 .../physical/LlapClusterStateForCompile.java    |  11 +-
 .../hadoop/hive/ql/exec/TestOperators.java      |  63 +++
 .../dynamic_semijoin_user_level.q               |   1 +
 .../test/queries/clientpositive/explainuser_4.q |   1 +
 .../test/queries/clientpositive/tez_smb_main.q  |  13 +-
 .../tez_vector_dynpart_hashjoin_1.q             |   1 +
 .../clientpositive/llap/tez_smb_main.q.out      | 426 +++++++++++++++++++
 10 files changed, 589 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8e5a9aa..d3ea824 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3143,6 +3143,15 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
       "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" +
       "executed in parallel.", "llap.daemon.num.executors"),
+    LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f,
+      "Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" +
+        "by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"),
+    LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3,
+      "Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" +
+        "which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" +
+        "hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" +
+        "conversion decision will be made). This is only an upper bound. Lower bound is determined by number of\n" +
+        "executors and configured max concurrency."),
     LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4,
         "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" +
         "executors in llap daemon, it would be set to number of executors at runtime.",

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index e5d063f..40a216b 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -229,6 +229,13 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-registry</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
       <version>${hadoop.version}</version>
       <optional>true</optional>

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 637bc54..ad77e87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
 import org.apache.hadoop.hive.ql.parse.GenTezUtils;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
@@ -68,6 +69,8 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
  * (aka shuffle join) with a map join (aka broadcast or fragment replicate
@@ -95,15 +98,18 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     JoinOperator joinOp = (JoinOperator) nd;
     long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
 
+    // adjust noconditional task size threshold for LLAP
+    maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
+
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
       if (retval == null) {
         return retval;
       } else {
-        fallbackToReduceSideJoin(joinOp, context);
+        fallbackToReduceSideJoin(joinOp, context, maxSize);
         return null;
       }
     }
@@ -120,13 +126,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     LOG.info("Estimated number of buckets " + numBuckets);
     int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize, true);
     if (mapJoinConversionPos < 0) {
-      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
+      Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx, maxSize);
       if (retval == null) {
         return retval;
       } else {
         // only case is full outer join with SMB enabled which is not possible. Convert to regular
         // join.
-        fallbackToReduceSideJoin(joinOp, context);
+        fallbackToReduceSideJoin(joinOp, context, maxSize);
         return null;
       }
     }
@@ -147,7 +153,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
 
@@ -164,15 +170,54 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return null;
   }
 
+  @VisibleForTesting
+  public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
+    if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
+      LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
+      llapInfo.initClusterInfo();
+      final int executorsPerNode;
+      if (!llapInfo.hasClusterInfo()) {
+        LOG.warn("LLAP cluster information not available. Falling back to getting #executors from hiveconf..");
+        executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+      } else {
+        final int numExecutorsPerNodeFromCluster = llapInfo.getNumExecutorsPerNode();
+        if (numExecutorsPerNodeFromCluster == -1) {
+          LOG.warn("Cannot determine executor count from LLAP cluster information. Falling back to getting #executors" +
+            " from hiveconf..");
+          executorsPerNode = conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+        } else {
+          executorsPerNode = numExecutorsPerNodeFromCluster;
+        }
+      }
+      final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+      if (numSessions > 0) {
+        final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
+        final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+        final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
+        final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
+        final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
+        LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
+            "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
+            "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
+          availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
+        return Math.max(maxSize, llapMaxSize);
+      } else {
+        LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
+          " as no conditional task size for LLAP.", numSessions, maxSize);
+      }
+    }
+    return maxSize;
+  }
+
   @SuppressWarnings("unchecked")
   private Object checkAndConvertSMBJoin(OptimizeTezProcContext context, JoinOperator joinOp,
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+    TezBucketJoinProcCtx tezBucketJoinProcCtx, final long maxSize) throws SemanticException {
     // we cannot convert to bucket map join, we cannot convert to
     // map join either based on the size. Check if we can convert to SMB join.
     if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
       || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
           && joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
     Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
@@ -201,7 +246,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // contains aliases from sub-query
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
       return null;
     }
 
@@ -211,7 +256,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     } else {
       // we are just converting to a common merge join operator. The shuffle
       // join in map-reduce case.
-      fallbackToReduceSideJoin(joinOp, context);
+      fallbackToReduceSideJoin(joinOp, context, maxSize);
     }
     return null;
   }
@@ -928,15 +973,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return numBuckets;
   }
 
-  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+  private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+    final long maxSize)
     throws SemanticException {
     // Attempt dynamic partitioned hash join
     // Since we don't have big table index yet, must start with estimate of numReducers
     int numReducers = estimateNumBuckets(joinOp, false);
     LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
-    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
-            context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD),
-            false);
+    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, maxSize,false);
     if (bigTablePos >= 0) {
       // Now that we have the big table index, get real numReducers value based on big table RS
       ReduceSinkOperator bigTableParentRS =
@@ -971,11 +1015,11 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     return false;
   }
 
-  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context)
+  private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContext context, final long maxSize)
       throws SemanticException {
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) &&
         context.conf.getBoolVar(HiveConf.ConfVars.HIVEDYNAMICPARTITIONHASHJOIN)) {
-      if (convertJoinDynamicPartitionedHashJoin(joinOp, context)) {
+      if (convertJoinDynamicPartitionedHashJoin(joinOp, context, maxSize)) {
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index b2e8614..a5ed308 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -41,6 +41,7 @@ public class LlapClusterStateForCompile {
   private static final long CLUSTER_UPDATE_INTERVAL_NS = 120 * 1000000000L; // 2 minutes.
   private Long lastClusterUpdateNs;
   private Integer noConfigNodeCount, executorCount;
+  private int numExecutorsPerNode = -1;
   private LlapRegistryService svc;
   private final Configuration conf;
 
@@ -82,6 +83,10 @@ public class LlapClusterStateForCompile {
     return noConfigNodeCount;
   }
 
+  public int getNumExecutorsPerNode() {
+    return numExecutorsPerNode;
+  }
+
   public synchronized void initClusterInfo() {
     if (lastClusterUpdateNs != null) {
       long elapsed = System.nanoTime() - lastClusterUpdateNs;
@@ -111,7 +116,11 @@ public class LlapClusterStateForCompile {
         continue;
       }
       try {
-        executorsLocal += Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
+        int numExecutors = Integer.parseInt(props.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
+        executorsLocal += numExecutors;
+        if (numExecutorsPerNode == -1) {
+          numExecutorsPerNode = numExecutors;
+        }
       } catch (NumberFormatException e) {
         ++noConfigNodesLocal;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 57e573a..b569549 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
 import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -435,4 +436,66 @@ public class TestOperators extends TestCase {
     assertEquals(20, result.size());
     driver.close();
   }
+
+  @Test
+  public void testNoConditionalTaskSizeForLlap() {
+    ConvertJoinMapJoin convertJoinMapJoin = new ConvertJoinMapJoin();
+    long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L;
+    HiveConf hiveConf = new HiveConf();
+
+    // execution mode not set, default is returned
+    long gotSize = convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf);
+    assertEquals(defaultNoConditionalTaskSize, gotSize);
+    hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
+
+    // default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed
+    hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "0.2");
+    double fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+    int maxSlots = 3;
+    long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots));
+    assertEquals(expectedSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // num executors is less than max executors per query (which is not expected case), default executors will be
+    // chosen. 4 * 20% of noconditional task size will be oversubscribed
+    int chosenSlots = hiveConf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+    hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5");
+    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots));
+    assertEquals(expectedSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // 2 concurrent sessions, 4 executors. 2 * 20% of noconditional task size will be oversubscribed
+    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
+    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
+    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
+    assertEquals(expectedSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // 4 concurrent sessions, 4 executors. 1 * 20% of noconditional task size will be oversubscribed
+    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
+    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "4");
+    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 1));
+    assertEquals(expectedSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // 8 concurrent sessions, 4 executors. default noconditioanl task will be used (no oversubscription)
+    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
+    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "8");
+    assertEquals(defaultNoConditionalTaskSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // 2 * 120% of noconditional task size will be oversubscribed
+    hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
+    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
+    hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "1.2");
+    fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+    expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
+    assertEquals(expectedSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+
+    // 0 value for number of sessions
+    hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "0");
+    assertEquals(defaultNoConditionalTaskSize,
+      convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
index 88ab46e..11bd17a 100644
--- a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
@@ -11,6 +11,7 @@ set hive.stats.autogather=true;
 set hive.tez.bigtable.minsize.semijoin.reduction=1;
 set hive.tez.min.bloom.filter.entries=1;
 set hive.stats.fetch.column.stats=true;
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
 
 -- Create Tables
 create table alltypesorc_int ( cint int, cstring string ) stored as ORC;

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/explainuser_4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainuser_4.q b/ql/src/test/queries/clientpositive/explainuser_4.q
index f58afa8..8f92140 100644
--- a/ql/src/test/queries/clientpositive/explainuser_4.q
+++ b/ql/src/test/queries/clientpositive/explainuser_4.q
@@ -1,4 +1,5 @@
 set hive.mapred.mode=nonstrict;
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
 
 set hive.explain.user=true;
 set hive.auto.convert.join=false;

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/tez_smb_main.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_main.q b/ql/src/test/queries/clientpositive/tez_smb_main.q
index ee24691..e4ab75a 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_main.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_main.q
@@ -70,9 +70,15 @@ from tab a join tab_part b on a.key = b.key;
 set hive.auto.convert.join.noconditionaltask.size=500;
 set hive.mapjoin.hybridgrace.minwbsize=125;
 set hive.mapjoin.hybridgrace.minnumpartitions=4;
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+set hive.llap.memory.oversubscription.max.executors.per.query=3;
 explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
 select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
 
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+set hive.llap.memory.oversubscription.max.executors.per.query=3;
 explain select count(*) from tab a join tab_part b on a.value = b.value;
 select count(*) from tab a join tab_part b on a.value = b.value;
 
@@ -84,10 +90,15 @@ select s2.key as key, s2.value as value from tab s2
 
 set hive.auto.convert.join.noconditionaltask.size=10000;
 
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
+explain select count(*) from tab a join tab_part b on a.value = b.value;
+set hive.llap.memory.oversubscription.max.executors.per.query=2;
 explain select count(*) from tab a join tab_part b on a.value = b.value;
 select count(*) from tab a join tab_part b on a.value = b.value;
 
-
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
+explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
+set hive.llap.memory.oversubscription.max.executors.per.query=2;
 explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
 select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value;
 set hive.stats.fetch.column.stats=true;

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
index 7dd3003..04683d2 100644
--- a/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
+++ b/ql/src/test/queries/clientpositive/tez_vector_dynpart_hashjoin_1.q
@@ -3,6 +3,7 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.auto.convert.join=false;
 set hive.optimize.dynamic.partition.hashjoin=false;
+set hive.llap.memory.oversubscription.max.executors.per.query=0;
 
 -- First try with regular mergejoin
 explain

http://git-wip-us.apache.org/repos/asf/hive/blob/17b1110f/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
index b583bff..4f9c95a 100644
--- a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
@@ -679,6 +679,126 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col1
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                        Map Join Operator
+                          condition map:
+                               Inner Join 0 to 1
+                          keys:
+                            0 _col1 (type: string)
+                            1 _col0 (type: string)
+                          input vertices:
+                            1 Map 4
+                          Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                          Group By Operator
+                            aggregations: count()
+                            mode: hash
+                            outputColumnNames: _col0
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                            Reduce Output Operator
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                              value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: c
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
 PREHOOK: query: select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src1
@@ -792,6 +912,102 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col1
+                      Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col1 (type: string)
+                  1 _col1 (type: string)
+                Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
 PREHOOK: query: select count(*) from tab a join tab_part b on a.value = b.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tab
@@ -1042,6 +1258,96 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.value = b.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 2 <- Map 1 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col1
+                      Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col1 (type: string)
+                        Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col1 (type: string)
+                          1 _col1 (type: string)
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          aggregations: count()
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
 PREHOOK: query: select count(*) from tab a join tab_part b on a.value = b.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tab
@@ -1177,6 +1483,126 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
+PREHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (key is not null and value is not null) (type: boolean)
+                    Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: b
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col0 (type: int)
+                          1 _col0 (type: int)
+                        outputColumnNames: _col1
+                        input vertices:
+                          0 Map 1
+                        Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
+                        Map Join Operator
+                          condition map:
+                               Inner Join 0 to 1
+                          keys:
+                            0 _col1 (type: string)
+                            1 _col0 (type: string)
+                          input vertices:
+                            1 Map 4
+                          Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE
+                          Group By Operator
+                            aggregations: count()
+                            mode: hash
+                            outputColumnNames: _col0
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                            Reduce Output Operator
+                              sort order: 
+                              Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                              value expressions: _col0 (type: bigint)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: c
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
 PREHOOK: query: select count(*) from tab a join tab_part b on a.key = b.key join src1 c on a.value = c.value
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src1