You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/15 00:00:03 UTC

[1/2] hive git commit: HIVE-18153 : refactor reopen and file management in TezTask (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master ca96613da -> e120bd8b0


HIVE-18153 : refactor reopen and file management in TezTask (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/89dbf4e9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/89dbf4e9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/89dbf4e9

Branch: refs/heads/master
Commit: 89dbf4e904592318da954eaf94548ec1b130e17c
Parents: ca96613
Author: sergey <se...@apache.org>
Authored: Thu Dec 14 15:53:44 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Dec 14 15:53:44 2017 -0800

----------------------------------------------------------------------
 ql/pom.xml                                      |   2 -
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   1 +
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 102 ++++-----
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  23 +-
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  36 +---
 .../hive/ql/exec/tez/TezSessionPoolSession.java |  20 +-
 .../hive/ql/exec/tez/TezSessionState.java       | 208 +++++++++++--------
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 189 ++++++-----------
 .../hive/ql/exec/tez/WorkloadManager.java       | 118 +++++++----
 .../ql/exec/tez/monitoring/TezJobMonitor.java   |   2 +-
 .../ql/udf/generic/GenericUDTFGetSplits.java    |   5 +-
 .../hive/ql/exec/tez/SampleTezSessionState.java |   7 +-
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  30 ++-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |  73 ++-----
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  10 +-
 15 files changed, 378 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index f35a4c8..cbf71cd 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -225,8 +225,6 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-registry</artifactId>
       <version>${hadoop.version}</version>
-      <optional>true</optional>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 88a75ed..3f470eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -400,6 +400,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
       SessionState ss = SessionState.get();
       // TODO: why is there a TezSession in MR ExecDriver?
       if (ss != null && HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        // TODO: this is the only place that uses keepTmpDir. Why?
         TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 6c1afa6..e4a6f62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -17,14 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collection;
 
+import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-
 import javax.security.auth.login.LoginException;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
@@ -40,7 +39,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -541,16 +539,15 @@ public class DagUtils {
     }
   }
 
-  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
-      List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
-      VertexType vertexType)
-      throws Exception {
+  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs,
+      Path mrScratchDir, Context ctx, VertexType vertexType,
+      Map<String, LocalResource> localResources) throws Exception {
     Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
     if (mergeJoinWork.getMainWork() instanceof MapWork) {
       List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
       MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
-      Vertex mergeVx =
-          createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+      Vertex mergeVx = createVertex(
+          conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources);
 
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set
@@ -580,10 +577,8 @@ public class DagUtils {
       mergeVx.setVertexManagerPlugin(desc);
       return mergeVx;
     } else {
-      Vertex mergeVx =
-          createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
-              mrScratchDir, ctx);
-      return mergeVx;
+      return createVertex(conf,
+          (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources);
     }
   }
 
@@ -591,11 +586,8 @@ public class DagUtils {
    * Helper function to create Vertex from MapWork.
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
-      LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx, VertexType vertexType)
-      throws Exception {
-
-    Path tezDir = getTezDir(mrScratchDir);
+      FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType,
+      Map<String, LocalResource> localResources) throws Exception {
 
     // set up the operator plan
     Utilities.cacheMapWork(conf, mapWork, mrScratchDir);
@@ -726,13 +718,6 @@ public class DagUtils {
     // Add the actual source input
     String alias = mapWork.getAliasToWork().keySet().iterator().next();
     map.addDataSource(alias, dataSource);
-
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    localResources.put(getBaseName(appJarLr), appJarLr);
-    for (LocalResource lr: additionalLr) {
-      localResources.put(getBaseName(lr), lr);
-    }
-
     map.addTaskLocalFiles(localResources);
     return map;
   }
@@ -772,9 +757,9 @@ public class DagUtils {
   /*
    * Helper function to create Vertex for given ReduceWork.
    */
-  private Vertex createVertex(JobConf conf, ReduceWork reduceWork,
-      LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx) throws Exception {
+  private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs,
+      Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources)
+          throws Exception {
 
     // set up operator plan
     conf.set(Utilities.INPUT_NAME, reduceWork.getName());
@@ -796,17 +781,28 @@ public class DagUtils {
     reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
     reducer.setExecutionContext(vertexExecutionContext);
     reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
-
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    localResources.put(getBaseName(appJarLr), appJarLr);
-    for (LocalResource lr: additionalLr) {
-      localResources.put(getBaseName(lr), lr);
-    }
     reducer.addTaskLocalFiles(localResources);
-
     return reducer;
   }
 
+  public static Map<String, LocalResource> createTezLrMap(
+      LocalResource appJarLr, Collection<LocalResource> additionalLr) {
+    // Note: interestingly this would exclude LLAP app jars that the session adds for LLAP case.
+    //       Of course it doesn't matter because vertices run ON LLAP and have those jars, and
+    //       moreover we anyway don't localize jars for the vertices on LLAP; but in theory
+    //       this is still crappy code that assumes there's one and only app jar.
+    Map<String, LocalResource> localResources = new HashMap<>();
+    if (appJarLr != null) {
+      localResources.put(getBaseName(appJarLr), appJarLr);
+    }
+    if (additionalLr != null) {
+      for (LocalResource lr: additionalLr) {
+        localResources.put(getBaseName(lr), lr);
+      }
+    }
+    return localResources;
+  }
+
   /*
    * Helper method to create a yarn local resource.
    */
@@ -1064,7 +1060,7 @@ public class DagUtils {
   /*
    * Helper function to retrieve the basename of a local resource
    */
-  public String getBaseName(LocalResource lr) {
+  public static String getBaseName(LocalResource lr) {
     return FilenameUtils.getName(lr.getResource().getFile());
   }
 
@@ -1254,30 +1250,26 @@ public class DagUtils {
    * @param work The instance of BaseWork representing the actual work to be performed
    * by this vertex.
    * @param scratchDir HDFS scratch dir for this execution unit.
-   * @param appJarLr Local resource for hive-exec.
-   * @param additionalLr
    * @param fileSystem FS corresponding to scratchDir and LocalResources
    * @param ctx This query's context
    * @return Vertex
    */
   @SuppressWarnings("deprecation")
   public Vertex createVertex(JobConf conf, BaseWork work,
-      Path scratchDir, LocalResource appJarLr,
-      List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
-      TezWork tezWork, VertexType vertexType) throws Exception {
+      Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren,
+      TezWork tezWork, VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
 
     Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
-      v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
-              vertexType);
+      v = createVertex(
+          conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
     } else if (work instanceof ReduceWork) {
-      v = createVertex(conf, (ReduceWork) work, appJarLr,
-          additionalLr, fileSystem, scratchDir, ctx);
+      v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources);
     } else if (work instanceof MergeJoinWork) {
-      v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
-              ctx, vertexType);
+      v = createVertex(
+          conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
       // set VertexManagerPlugin if whether it's a cross product destination vertex
       List<String> crossProductSources = new ArrayList<>();
       for (BaseWork parentWork : tezWork.getParents(work)) {
@@ -1522,4 +1514,18 @@ public class DagUtils {
     // -Xmx not specified
     return -1;
   }
+
+  // The utility of this method is not certain.
+  public static Map<String, LocalResource> getResourcesUpdatableForAm(
+      Collection<LocalResource> allNonAppResources) {
+    HashMap<String, LocalResource> allNonAppFileResources = new HashMap<>();
+    if (allNonAppResources == null) return allNonAppFileResources;
+    for (LocalResource lr : allNonAppResources) {
+      if (lr.getType() == LocalResourceType.FILE) {
+        // TEZ AM will only localize FILE (no script operators in the AM)
+        allNonAppFileResources.put(DagUtils.getBaseName(lr), lr);
+      }
+    }
+    return allNonAppFileResources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 3bcf657..6e2dfe1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -217,7 +217,10 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
         }
       }
       // If there are async requests, satisfy them first.
-      if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+      if (!asyncRequests.isEmpty()) {
+        if (!session.tryUse(false)) {
+          return true; // Session has expired and will be returned to us later.
+        }
         future = asyncRequests.poll();
       }
       if (future == null) {
@@ -238,24 +241,12 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     return true;
   }
 
-  void replaceSession(SessionType oldSession, boolean keepTmpDir,
-      String[] additionalFilesArray) throws Exception {
-    // Retain the stuff from the old session.
+  void replaceSession(SessionType oldSession) throws Exception {
     // Re-setting the queue config is an old hack that we may remove in future.
     SessionType newSession = sessionObjFactory.create(oldSession);
-    Path scratchDir = oldSession.getTezScratchDir();
     String queueName = oldSession.getQueueName();
-    Set<String> additionalFiles = null;
-    if (additionalFilesArray != null) {
-      additionalFiles = new HashSet<>();
-      for (String file : additionalFilesArray) {
-        additionalFiles.add(file);
-      }
-    } else {
-      additionalFiles = oldSession.getAdditionalFilesNotFromConf();
-    }
     try {
-      oldSession.close(keepTmpDir);
+      oldSession.close(false);
     } finally {
       poolLock.lock();
       try {
@@ -280,7 +271,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
         // probably just get rid of the thread local usage in TezSessionState.
         SessionState.setCurrentSessionState(parentSessionState);
       }
-      newSession.open(additionalFiles, scratchDir);
+      newSession.open();
       if (!putSessionBack(newSession, false)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Closing an unneeded session " + newSession

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 8417ebb..3c1b8d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,16 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
+
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -43,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -444,44 +441,35 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   /** Reopens the session that was found to not be running. */
   @Override
-  public TezSessionState reopen(TezSessionState sessionState,
-      Configuration conf, String[] additionalFiles) throws Exception {
+  public TezSessionState reopen(TezSessionState sessionState) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
     if (sessionState.getQueueName() != null
         && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
       sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
     }
-    reopenInternal(sessionState, additionalFiles);
+    reopenInternal(sessionState);
     return sessionState;
   }
 
   static void reopenInternal(
-      TezSessionState sessionState, String[] additionalFiles) throws Exception {
-    Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
-    // TODO: implies the session files and the array are the same if not null; why? very brittle
-    if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
-        && (additionalFiles != null)) {
-      oldAdditionalFiles = new HashSet<>();
-      for (String file : additionalFiles) {
-        oldAdditionalFiles.add(file);
-      }
-    }
+      TezSessionState sessionState) throws Exception {
+    HiveResources resources = sessionState.extractHiveResources();
     // TODO: close basically resets the object to a bunch of nulls.
     //       We should ideally not reuse the object because it's pointless and error-prone.
-    sessionState.close(true);
+    sessionState.close(false);
     // Note: scratchdir is reused implicitly because the sessionId is the same.
-    sessionState.open(oldAdditionalFiles, null);
+    sessionState.open(resources);
   }
 
 
-  public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
+  public void closeNonDefaultSessions() throws Exception {
     List<TezSessionState> sessionsToClose = null;
     synchronized (openSessions) {
       sessionsToClose = new ArrayList<TezSessionState>(openSessions);
     }
     for (TezSessionState sessionState : sessionsToClose) {
       System.err.println("Shutting down tez session.");
-      closeIfNotDefault(sessionState, keepTmpDir);
+      closeIfNotDefault(sessionState, false);
     }
   }
 
@@ -492,9 +480,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     if (queueName == null) {
       LOG.warn("Pool session has a null queue: " + oldSession);
     }
-    TezSessionPoolSession newSession = createAndInitSession(
-      queueName, oldSession.isDefault(), oldSession.getConf());
-    defaultSessionPool.replaceSession(oldSession, false, null);
+    defaultSessionPool.replaceSession(oldSession);
   }
 
   /** Called by TezSessionPoolSession when opened. */

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index b3ccd24..96ade50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -19,11 +19,8 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import com.google.common.util.concurrent.SettableFuture;
-
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
-
 import org.apache.hadoop.conf.Configuration;
-
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -31,9 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import javax.security.auth.login.LoginException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,7 +37,6 @@ import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 import org.apache.tez.dag.api.TezException;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -66,8 +60,7 @@ class TezSessionPoolSession extends TezSessionState {
 
     void returnAfterUse(TezSessionPoolSession session) throws Exception;
 
-    TezSessionState reopen(TezSessionState session, Configuration conf,
-      String[] inputOutputJars) throws Exception;
+    TezSessionState reopen(TezSessionState session) throws Exception;
 
     void destroy(TezSessionState session) throws Exception;
   }
@@ -128,10 +121,10 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   @Override
-  protected void openInternal(Collection<String> additionalFiles,
-      boolean isAsync, LogHelper console, Path scratchDir)
+  protected void openInternal(String[] additionalFiles,
+      boolean isAsync, LogHelper console, HiveResources resources)
           throws IOException, LoginException, URISyntaxException, TezException {
-    super.openInternal(additionalFiles, isAsync, console, scratchDir);
+    super.openInternal(additionalFiles, isAsync, console, resources);
     parent.registerOpenSession(this);
     if (expirationTracker != null) {
       expirationTracker.addToExpirationQueue(this);
@@ -206,9 +199,8 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   @Override
-  public TezSessionState reopen(
-      Configuration conf, String[] inputOutputJars) throws Exception {
-    return parent.reopen(this, conf, inputOutputJars);
+  public TezSessionState reopen() throws Exception {
+    return parent.reopen(this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index dd879fc..5e892c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.Collection;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -37,7 +38,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.security.auth.login.LoginException;
-
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -90,7 +90,6 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
@@ -126,8 +125,18 @@ public class TezSessionState {
 
   private AtomicReference<String> ownerThread = new AtomicReference<>(null);
 
-  private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
-  private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
+  public static final class HiveResources {
+    public HiveResources(Path dagResourcesDir) {
+      this.dagResourcesDir = dagResourcesDir;
+    }
+    /** A directory that will contain resources related to DAGs and specified in configs. */
+    public final Path dagResourcesDir;
+    public final Set<String> additionalFilesNotFromConf = new HashSet<>();
+    /** Localized resources of this session; both from conf and not from conf (above). */
+    public final Set<LocalResource> localizedResources = new HashSet<>();
+  }
+
+  private HiveResources resources;
   @JsonProperty("doAsEnabled")
   private boolean doAsEnabled;
   private boolean isLegacyLlapMode;
@@ -201,45 +210,32 @@ public class TezSessionState {
   }
 
   public void open() throws IOException, LoginException, URISyntaxException, TezException {
-    Set<String> noFiles = null;
-    open(noFiles, null);
+    String[] noFiles = null;
+    open(noFiles);
   }
 
   /**
    * Creates a tez session. A session is tied to either a cli/hs2 session. You can
    * submit multiple DAGs against a session (as long as they are executed serially).
-   * @throws IOException
-   * @throws URISyntaxException
-   * @throws LoginException
-   * @throws TezException
-   * @throws InterruptedException
    */
-  public void open(String[] additionalFiles)
+  public void open(String[] additionalFilesNotFromConf)
       throws IOException, LoginException, URISyntaxException, TezException {
-    openInternal(setFromArray(additionalFiles), false, null, null);
+    openInternal(additionalFilesNotFromConf, false, null, null);
   }
 
-  private static Set<String> setFromArray(String[] additionalFiles) {
-    if (additionalFiles == null) return null;
-    Set<String> files = new HashSet<>();
-    for (String originalFile : additionalFiles) {
-      files.add(originalFile);
-    }
-    return files;
+
+  public void open(HiveResources resources)
+      throws LoginException, IOException, URISyntaxException, TezException {
+    openInternal(null, false, null, resources);
   }
 
   public void beginOpen(String[] additionalFiles, LogHelper console)
       throws IOException, LoginException, URISyntaxException, TezException {
-    openInternal(setFromArray(additionalFiles), true, console, null);
+    openInternal(additionalFiles, true, console, null);
   }
 
-  public void open(Collection<String> additionalFiles, Path scratchDir)
-      throws LoginException, IOException, URISyntaxException, TezException {
-    openInternal(additionalFiles, false, null, scratchDir);
-  }
-
-  protected void openInternal(Collection<String> additionalFiles,
-      boolean isAsync, LogHelper console, Path scratchDir)
+  protected void openInternal(String[] additionalFilesNotFromConf,
+      boolean isAsync, LogHelper console, HiveResources resources)
           throws IOException, LoginException, URISyntaxException, TezException {
     // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
     String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
@@ -258,25 +254,25 @@ public class TezSessionState {
     user = ugi.getShortUserName();
     LOG.info("User of session id " + sessionId + " is " + user);
 
-    // create the tez tmp dir
-    tezScratchDir = scratchDir == null ? createTezDir(sessionId) : scratchDir;
-
-    additionalFilesNotFromConf.clear();
-    if (additionalFiles != null) {
-      additionalFilesNotFromConf.addAll(additionalFiles);
+    // Create the tez tmp dir and a directory for Hive resources.
+    tezScratchDir = createTezDir(sessionId, null);
+    if (resources != null) {
+      // If we are getting the resources externally, don't relocalize anything.
+      this.resources = resources;
+    } else {
+      this.resources = new HiveResources(createTezDir(sessionId, "resources"));
+      ensureLocalResources(conf, additionalFilesNotFromConf);
     }
 
-    refreshLocalResourcesFromConf(conf);
-
     // unless already installed on all the cluster nodes, we'll have to
     // localize hive-exec.jar as well.
     appJarLr = createJarLocalResource(utils.getExecJarPathLocal());
 
     // configuration for the application master
     final Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
-    commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
-    for (LocalResource lr : localizedResources) {
-      commonLocalResources.put(utils.getBaseName(lr), lr);
+    commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
+    for (LocalResource lr : this.resources.localizedResources) {
+      commonLocalResources.put(DagUtils.getBaseName(lr), lr);
     }
 
     if (llapMode) {
@@ -284,7 +280,7 @@ public class TezSessionState {
       addJarLRByClass(LlapTaskSchedulerService.class, commonLocalResources);
       addJarLRByClass(LlapProtocolClientImpl.class, commonLocalResources);
       addJarLRByClass(LlapProtocolClientProxy.class, commonLocalResources);
-      addJarLRByClassName("org.apache.hadoop.registry.client.api.RegistryOperations", commonLocalResources);
+      addJarLRByClass(RegistryOperations.class, commonLocalResources);
     }
 
     // Create environment for AM.
@@ -556,36 +552,54 @@ public class TezSessionState {
     tezConf.set(TezConfiguration.TEZ_AM_MODIFY_ACLS, modifyStr);
   }
 
-  public void refreshLocalResourcesFromConf(HiveConf conf)
-      throws IOException, LoginException, URISyntaxException, TezException {
-
-    String dir = tezScratchDir.toString();
-
-    localizedResources.clear();
+  /** This is called in openInternal and in TezTask.updateSession to localize conf resources. */
+  public void ensureLocalResources(Configuration conf, String[] newFilesNotFromConf)
+          throws IOException, LoginException, URISyntaxException, TezException {
+    String dir = resources.dagResourcesDir.toString();
+    resources.localizedResources.clear();
 
-    // these are local resources set through add file, jar, etc
+    // Always localize files from conf; duplicates are handled on FS level.
+    // TODO: we could do the same thing as below and only localize if missing.
+    //       That could be especially valuable given that this almost always the same set.
     List<LocalResource> lrs = utils.localizeTempFilesFromConf(dir, conf);
     if (lrs != null) {
-      localizedResources.addAll(lrs);
+      resources.localizedResources.addAll(lrs);
     }
 
-    // these are local resources that are set through the mr "tmpjars" property; skip session files.
-    List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
-      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
-      DagUtils.getTempFilesFromConf(conf));
-
-    if (handlerLr != null) {
-      localizedResources.addAll(handlerLr);
+    // Localize the non-conf resources that are missing from the current list.
+    List<LocalResource> newResources = null;
+    if (newFilesNotFromConf != null && newFilesNotFromConf.length > 0) {
+      boolean hasResources = !resources.additionalFilesNotFromConf.isEmpty();
+      if (hasResources) {
+        for (String s : newFilesNotFromConf) {
+          hasResources = resources.additionalFilesNotFromConf.contains(s);
+          if (!hasResources) break;
+        }
+      }
+      if (!hasResources) {
+        String[] skipFilesFromConf = DagUtils.getTempFilesFromConf(conf);
+        newResources = utils.localizeTempFiles(dir, conf, newFilesNotFromConf, skipFilesFromConf);
+        if (newResources != null) {
+          resources.localizedResources.addAll(newResources);
+        }
+        for (String fullName : newFilesNotFromConf) {
+          resources.additionalFilesNotFromConf.add(fullName);
+        }
+      }
     }
-  }
 
-  public boolean hasResources(String[] localAmResources) {
-    if (localAmResources == null || localAmResources.length == 0) return true;
-    if (additionalFilesNotFromConf.isEmpty()) return false;
-    for (String s : localAmResources) {
-      if (!additionalFilesNotFromConf.contains(s)) return false;
+    // Finally add the files to AM. The old code seems to do this twice, first for all the new
+    // resources regardless of type; and then for all the session resources that are not of type
+    // file (see branch-1 calls to addAppMasterLocalFiles: from updateSession and with resourceMap
+    // from submit).
+    // TODO: Do we really need all this nonsense?
+    if (newResources != null && !newResources.isEmpty()) {
+      session.addAppMasterLocalFiles(DagUtils.createTezLrMap(null, newResources));
+    }
+    if (!resources.localizedResources.isEmpty()) {
+      session.addAppMasterLocalFiles(
+          DagUtils.getResourcesUpdatableForAm(resources.localizedResources));
     }
-    return true;
   }
 
   /**
@@ -593,11 +607,11 @@ public class TezSessionState {
    * further DAGs can be executed against it. Only called by session management classes; some
    * sessions should not simply be closed by users - e.g. pool sessions need to be restarted.
    *
-   * @param keepTmpDir
+   * @param keepDagFilesDir
    *          whether or not to remove the scratch dir at the same time.
    * @throws Exception
    */
-  void close(boolean keepTmpDir) throws Exception {
+  void close(boolean keepDagFilesDir) throws Exception {
     if (session != null) {
       LOG.info("Closing Tez Session");
       closeClient(session);
@@ -618,20 +632,16 @@ public class TezSessionState {
       }
     }
 
-    if (!keepTmpDir) {
-      cleanupScratchDir();
+    cleanupScratchDir();
+    if (!keepDagFilesDir) {
+      cleanupDagResources();
     }
     session = null;
     sessionFuture = null;
     console = null;
     tezScratchDir = null;
+    // Do not reset dag resources; if it wasn't cleaned it's still needed.
     appJarLr = null;
-    additionalFilesNotFromConf.clear();
-    localizedResources.clear();
-  }
-
-  public Set<String> getAdditionalFilesNotFromConf() {
-    return additionalFilesNotFromConf;
   }
 
   private void closeClient(TezClient client) throws TezException,
@@ -643,12 +653,18 @@ public class TezSessionState {
     }
   }
 
-  protected final void cleanupScratchDir () throws IOException {
+  protected final void cleanupScratchDir() throws IOException {
     FileSystem fs = tezScratchDir.getFileSystem(conf);
     fs.delete(tezScratchDir, true);
     tezScratchDir = null;
   }
 
+  protected final void cleanupDagResources() throws IOException {
+    FileSystem fs = resources.dagResourcesDir.getFileSystem(conf);
+    fs.delete(resources.dagResourcesDir, true);
+    resources = null;
+  }
+
   public String getSessionId() {
     return sessionId;
   }
@@ -675,10 +691,6 @@ public class TezSessionState {
     return session;
   }
 
-  public Path getTezScratchDir() {
-    return tezScratchDir;
-  }
-
   public LocalResource getAppJarLr() {
     return appJarLr;
   }
@@ -687,11 +699,11 @@ public class TezSessionState {
    * createTezDir creates a temporary directory in the scratchDir folder to
    * be used with Tez. Assumes scratchDir exists.
    */
-  private Path createTezDir(String sessionId) throws IOException {
+  private Path createTezDir(String sessionId, String suffix) throws IOException {
     // tez needs its own scratch dir (per session)
     // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool.
     Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR);
-    tezDir = new Path(tezDir, sessionId);
+    tezDir = new Path(tezDir, sessionId + ((suffix == null) ? "" : ("-" + suffix)));
     FileSystem fs = tezDir.getFileSystem(conf);
     FsPermission fsPermission = new FsPermission(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
     fs.mkdirs(tezDir, fsPermission);
@@ -759,9 +771,8 @@ public class TezSessionState {
     final File jar =
         new File(Utilities.jarFinderGetJar(clazz));
     final String localJarPath = jar.toURI().toURL().toExternalForm();
-    final LocalResource jarLr =
-      createJarLocalResource(localJarPath);
-    lrMap.put(utils.getBaseName(jarLr), jarLr);
+    final LocalResource jarLr = createJarLocalResource(localJarPath);
+    lrMap.put(DagUtils.getBaseName(jarLr), jarLr);
   }
 
   private String getSha(final Path localFile) throws IOException, IllegalArgumentException {
@@ -808,7 +819,7 @@ public class TezSessionState {
   }
 
   public List<LocalResource> getLocalizedResources() {
-    return new ArrayList<>(localizedResources);
+    return new ArrayList<>(resources.localizedResources);
   }
 
   public String getUser() {
@@ -849,10 +860,9 @@ public class TezSessionState {
     TezSessionPoolManager.getInstance().returnSession(this);
   }
 
-  public TezSessionState reopen(
-      Configuration conf, String[] inputOutputJars) throws Exception {
+  public TezSessionState reopen() throws Exception {
     // By default, TezSessionPoolManager handles this for both pool and non-pool session.
-    return TezSessionPoolManager.getInstance().reopen(this, conf, inputOutputJars);
+    return TezSessionPoolManager.getInstance().reopen(this);
   }
 
   public void destroy() throws Exception {
@@ -875,4 +885,28 @@ public class TezSessionState {
   public KillQuery getKillQuery() {
     return killQuery;
   }
+
+  public HiveResources extractHiveResources() {
+    HiveResources result = resources;
+    resources = null;
+    return result;
+  }
+
+  public Path replaceHiveResources(HiveResources resources, boolean isAsync) {
+    Path dir = null;
+    if (this.resources != null) {
+      dir = this.resources.dagResourcesDir;
+      if (!isAsync) {
+        try {
+          dir.getFileSystem(conf).delete(dir, true);
+        } catch (Exception ex) {
+          LOG.error("Failed to delete the old resources directory "
+              + dir + "; ignoring " + ex.getLocalizedMessage());
+        }
+        dir = null;
+      }
+    }
+    this.resources = resources;
+    return dir;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 8795cfc..27799a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import org.apache.hive.common.util.Ref;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,9 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import javax.annotation.Nullable;
-
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -67,7 +65,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.CounterGroup;
@@ -87,7 +84,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.json.JSONObject;
-
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -133,7 +129,7 @@ public class TezTask extends Task<TezWork> {
     int rc = 1;
     boolean cleanContext = false;
     Context ctx = null;
-    TezSessionState session = null;
+    Ref<TezSessionState> sessionRef = Ref.from(null);
 
     try {
       // Get or create Context object. If we create it we have to clean it later as well.
@@ -147,15 +143,15 @@ public class TezTask extends Task<TezWork> {
         WmContext wmContext = new WmContext(System.currentTimeMillis(), queryId);
         ctx.setWmContext(wmContext);
       }
-
       // Need to remove this static hack. But this is the way currently to get a session.
       SessionState ss = SessionState.get();
       // Note: given that we return pool sessions to the pool in the finally block below, and that
       //       we need to set the global to null to do that, this "reuse" may be pointless.
-      session = ss.getTezSession();
+      TezSessionState session = sessionRef.value = ss.getTezSession();
       if (session != null && !session.isOpen()) {
         LOG.warn("The session: " + session + " has not been opened");
       }
+
       // We only need a username for UGI to use for groups; getGroups will fetch the groups
       // based on Hadoop configuration, as documented at
       // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
@@ -163,57 +159,51 @@ public class TezTask extends Task<TezWork> {
       MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
         : new MappingInput(ss.getUserName(),
             UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+
       WmContext wmContext = ctx.getWmContext();
-      session = WorkloadManagerFederation.getSession(session, conf, mi, getWork().getLlapMode(), wmContext);
+      // jobConf will hold all the configuration for hadoop, tez, and hive
+      JobConf jobConf = utils.createConfiguration(conf);
+      // Get all user jars from work (e.g. input format stuff).
+      String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
+      // DAG scratch dir. We get a session from the pool so it may be different from Tez one.
+      // TODO: we could perhaps reuse the same directory for HiveResources?
+      Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), conf);
+      CallerContext callerContext = CallerContext.create(
+          "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr());
+
+      session = sessionRef.value = WorkloadManagerFederation.getSession(
+          sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext);
 
-      LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), wmContext.getQueryId());
-      ss.setTezSession(session);
       try {
-        // jobConf will hold all the configuration for hadoop, tez, and hive
-        JobConf jobConf = utils.createConfiguration(conf);
-
-        // Get all user jars from work (e.g. input format stuff).
-        String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
-
-        // we will localize all the files (jars, plans, hashtables) to the
-        // scratch dir. let's create this and tmp first.
-        Path scratchDir = ctx.getMRScratchDir();
-
-        // create the tez tmp dir
-        scratchDir = utils.createTezDir(scratchDir, conf);
+        ss.setTezSession(session);
+        LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(),
+          wmContext.getQueryId());
 
-        // This is used to compare global and vertex resources. Global resources are originally
-        // derived from session conf via localizeTempFilesFromConf. So, use that here.
-        Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf;
-        Map<String,LocalResource> inputOutputLocalResources =
-            getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
+        // Ensure the session is open and has the necessary local resources.
+        // This would refresh any conf resources and also local resources.
+        ensureSessionHasResources(session, allNonConfFiles);
 
-        // Ensure the session is open and has the necessary local resources
-        updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
+        // This is a combination of the jar stuff from conf, and not from conf.
+        List<LocalResource> allNonAppResources = session.getLocalizedResources();
+        logResources(allNonAppResources);
 
-        List<LocalResource> additionalLr = session.getLocalizedResources();
-        logResources(additionalLr);
-
-        // unless already installed on all the cluster nodes, we'll have to
-        // localize hive-exec.jar as well.
-        LocalResource appJarLr = session.getAppJarLr();
+        Map<String, LocalResource> allResources = DagUtils.createTezLrMap(
+            session.getAppJarLr(), allNonAppResources);
 
         // next we translate the TezWork to a Tez DAG
-        DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
-        CallerContext callerContext = CallerContext.create(
-            "HIVE", queryPlan.getQueryId(),
-            "HIVE_QUERY_ID", queryPlan.getQueryStr());
+        DAG dag = build(jobConf, work, scratchDir, ctx, allResources);
         dag.setCallerContext(callerContext);
 
-        // Add the extra resources to the dag
-        addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
+        // Note: we no longer call addTaskLocalFiles because all the resources are correctly
+        //       updated in the session resource lists now, and thus added to vertices.
+        //       If something breaks, dag.addTaskLocalFiles might need to be called here.
 
         // Check isShutdown opportunistically; it's never unset.
         if (this.isShutdown) {
           throw new HiveException("Operation cancelled");
         }
-        DAGClient dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
-            additionalLr, inputOutputJars, inputOutputLocalResources);
+        DAGClient dagClient = submit(jobConf, dag, sessionRef);
+        session = sessionRef.value;
         boolean wasShutdown = false;
         synchronized (dagClientLock) {
           assert this.dagClient == null;
@@ -251,7 +241,9 @@ public class TezTask extends Task<TezWork> {
         // We return this to the pool even if it's unusable; reopen is supposed to handle this.
         wmContext = ctx.getWmContext();
         try {
-          session.returnToSessionManager();
+          if (sessionRef.value != null) {
+            sessionRef.value.returnToSessionManager();
+          }
         } catch (Exception e) {
           LOG.error("Failed to return session: {} to pool", session, e);
           throw e;
@@ -340,61 +332,23 @@ public class TezTask extends Task<TezWork> {
   }
 
   /**
-   * Converted the list of jars into local resources
-   */
-  Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
-      String[] inputOutputJars, Configuration sessionConf) throws Exception {
-    final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
-    // Skip the files already in session local resources...
-    final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
-        jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
-    if (null != localResources) {
-      for (LocalResource lr : localResources) {
-        resources.put(utils.getBaseName(lr), lr);
-      }
-    }
-    return resources;
-  }
-
-  /**
    * Ensures that the Tez Session is open and the AM has all necessary jars configured.
    */
-  void updateSession(TezSessionState session,
-      JobConf jobConf, Path scratchDir, String[] inputOutputJars,
-      Map<String,LocalResource> extraResources) throws Exception {
-    final boolean missingLocalResources = !session
-        .hasResources(inputOutputJars);
-
+  @VisibleForTesting
+  void ensureSessionHasResources(
+      TezSessionState session, String[] nonConfResources) throws Exception {
     TezClient client = session.getSession();
     // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
     if (client == null) {
+      // Note: the only sane case where this can happen is the non-pool one. We should get rid
+      //       of it, in non-pool case perf doesn't matter so we might as well open at get time
+      //       and then call update like we do in the else.
       // Can happen if the user sets the tez flag after the session was established.
       LOG.info("Tez session hasn't been created yet. Opening session");
-      session.open(inputOutputJars);
+      session.open(nonConfResources);
     } else {
       LOG.info("Session is already open");
-
-      // Ensure the open session has the necessary resources (StorageHandler)
-      if (missingLocalResources) {
-        LOG.info("Tez session missing resources," +
-            " adding additional necessary resources");
-        client.addAppMasterLocalFiles(extraResources);
-      }
-
-      session.refreshLocalResourcesFromConf(conf);
-    }
-  }
-
-  /**
-   * Adds any necessary resources that must be localized in each vertex to the DAG.
-   */
-  void addExtraResourcesToDag(TezSessionState session, DAG dag,
-      String[] inputOutputJars,
-      Map<String,LocalResource> inputOutputLocalResources) throws Exception {
-    if (!session.hasResources(inputOutputJars)) {
-      if (null != inputOutputLocalResources) {
-        dag.addTaskLocalFiles(inputOutputLocalResources);
-      }
+      session.ensureLocalResources(conf, nonConfResources);
     }
   }
 
@@ -406,9 +360,8 @@ public class TezTask extends Task<TezWork> {
     }
   }
 
-  DAG build(JobConf conf, TezWork work, Path scratchDir,
-      LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
-      throws Exception {
+  DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx,
+      Map<String, LocalResource> vertexResources) throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
 
@@ -426,7 +379,7 @@ public class TezTask extends Task<TezWork> {
     DAG dag = DAG.create(dagName);
 
     // set some info for the query
-    JSONObject json = new JSONObject(new LinkedHashMap()).put("context", "Hive")
+    JSONObject json = new JSONObject(new LinkedHashMap<>()).put("context", "Hive")
         .put("description", ctx.getCmd());
     String dagInfo = json.toString();
 
@@ -474,7 +427,6 @@ public class TezTask extends Task<TezWork> {
 
         // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
         // Pick any one source vertex to figure out the Edge configuration.
-       
 
         // now hook up the children
         for (BaseWork v: children) {
@@ -488,9 +440,8 @@ public class TezTask extends Task<TezWork> {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
         checkOutputSpec(w, wxConf);
-        Vertex wx =
-            utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal,
-                work, work.getVertexType(w));
+        Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal,
+            work, work.getVertexType(w), vertexResources);
         if (w.getReservedMemoryMB() > 0) {
           // If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
           double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
@@ -548,38 +499,28 @@ public class TezTask extends Task<TezWork> {
     dag.setAccessControls(ac);
   }
 
-  DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
-      LocalResource appJarLr, TezSessionState sessionState,
-      List<LocalResource> additionalLr, String[] inputOutputJars,
-      Map<String,LocalResource> inputOutputLocalResources)
-      throws Exception {
+  private TezSessionState getNewTezSessionOnError(
+      TezSessionState oldSession) throws Exception {
+    // Note: we don't pass the config to reopen. If the session was already open, it would
+    //       have kept running with its current config - preserve that behavior.
+    TezSessionState newSession = oldSession.reopen();
+    console.printInfo("Session re-established.");
+    return newSession;
+  }
+
+  DAGClient submit(JobConf conf, DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
     DAGClient dagClient = null;
-
-    Map<String, LocalResource> resourceMap = new HashMap<String, LocalResource>();
-    if (additionalLr != null) {
-      for (LocalResource lr: additionalLr) {
-        if (lr.getType() == LocalResourceType.FILE) {
-          // TEZ AM will only localize FILE (no script operators in the AM)
-          resourceMap.put(utils.getBaseName(lr), lr);
-        }
-      }
-    }
-
+    TezSessionState sessionState = sessionStateRef.value;
     try {
       try {
         // ready to start execution on the cluster
-        sessionState.getSession().addAppMasterLocalFiles(resourceMap);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (SessionNotRunning nr) {
         console.printInfo("Tez session was closed. Reopening...");
-
-        // close the old one, but keep the tmp files around
-        // conf is passed in only for the case when session conf is null (tests and legacy paths?)
-        sessionState = sessionState.reopen(conf, inputOutputJars);
+        sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
         console.printInfo("Session re-established.");
-
         dagClient = sessionState.getSession().submitDAG(dag);
       }
     } catch (Exception e) {
@@ -587,14 +528,12 @@ public class TezTask extends Task<TezWork> {
       try {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
-        // TODO: this is temporary, need to refactor how reopen is invoked.
-        WmContext oldCtx = sessionState.getWmContext();
-        sessionState = sessionState.reopen(conf, inputOutputJars);
-        sessionState.setWmContext(oldCtx);
+        sessionStateRef.value = sessionState = getNewTezSessionOnError(sessionState);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {
         // we failed to submit after retrying. Destroy session and bail.
         sessionState.destroy();
+        sessionStateRef.value = null;
         throw retryException;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index dbdbbf2..1f4843d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,9 +47,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
@@ -50,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
 import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.AmPluginNode.AmPluginInfo;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState.HiveResources;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.KillQuery;
@@ -67,15 +75,6 @@ import org.codehaus.jackson.map.SerializationConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 
 /** Workload management entry point for HS2.
  * Note on how this class operates.
@@ -342,6 +341,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private List<WmTezSession> toRestartInUse = new LinkedList<>(),
         toDestroyNoRestart = new LinkedList<>();
     private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<>();
+    private List<Path> pathsToDelete = Lists.newArrayList();
   }
 
   private void runWmThread() {
@@ -440,7 +440,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         try {
           WmEvent wmEvent = new WmEvent(WmEvent.EventType.RESTART);
           // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
-          tezAmPool.replaceSession(toRestart, false, null);
+          tezAmPool.replaceSession(toRestart);
           wmEvent.endEvent(toRestart);
         } catch (Exception ex) {
           LOG.error("Failed to restart an old session; ignoring", ex);
@@ -463,6 +463,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       });
     }
     context.toDestroyNoRestart.clear();
+
+    // 4. Delete unneeded directories that were replaced by other ones via reopen.
+    for (final Path path : context.pathsToDelete) {
+      LOG.info("Deleting {}", path);
+      workPool.submit(() -> {
+        try {
+          path.getFileSystem(conf).delete(path, true);
+        } catch (Exception ex) {
+          LOG.error("Failed to delete an old path; ignoring " + ex.getMessage());
+        }
+      });
+    }
+    context.pathsToDelete.clear();
   }
 
   /**
@@ -654,7 +667,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       if (LOG.isDebugEnabled()) {
         LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
       }
-      processPoolChangesOnMasterThread(poolName, hasRequeues);
+      processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
     }
 
 
@@ -852,7 +865,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     case OK:
       // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
       PoolState pool = pools.get(poolName);
-      SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(), session.getWmContext());
+      SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId(),
+          session.getWmContext(), session.extractHiveResources());
       // We have just removed the session from the same pool, so don't check concurrency here.
       pool.initializingSessions.add(sw);
       ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
@@ -953,7 +967,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           state = new PoolState(fullName, qp, fraction);
         } else {
           // This will also take care of the queries if query parallelism changed.
-          state.update(qp, fraction, syncWork.toKillQuery, e);
+          state.update(qp, fraction, syncWork, e);
           poolsToRedistribute.add(fullName);
         }
         state.setTriggers(new LinkedList<Trigger>());
@@ -988,7 +1002,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (oldPools != null && !oldPools.isEmpty()) {
       // Looks like some pools were removed; kill running queries, re-queue the queued ones.
       for (PoolState oldPool : oldPools.values()) {
-        oldPool.destroy(syncWork.toKillQuery, e.getRequests, e.toReuse);
+        oldPool.destroy(syncWork, e.getRequests, e.toReuse);
       }
     }
 
@@ -1027,7 +1041,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return deltaSessions + toTransfer;
   }
 
-  @SuppressWarnings("unchecked")
   private void failOnFutureFailure(ListenableFuture<?> future) {
     Futures.addCallback(future, FATAL_ERROR_CALLBACK);
   }
@@ -1088,7 +1101,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   }
 
 
-  private void processPoolChangesOnMasterThread(String poolName, boolean hasRequeues) throws Exception {
+  private void processPoolChangesOnMasterThread(
+      String poolName, boolean hasRequeues, WmThreadSyncWork syncWork) throws Exception {
     PoolState pool = pools.get(poolName);
     if (pool == null) return; // Might be from before the new resource plan.
 
@@ -1109,15 +1123,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // Note that in theory, we are guaranteed to have a session waiting for us here, but
       // the expiration, failures, etc. may cause one to be missing pending restart.
       // See SessionInitContext javadoc.
-      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId,
-        queueReq.wmContext);
+      SessionInitContext sw = new SessionInitContext(
+          queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null);
       ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
       Futures.addCallback(getFuture, sw);
       // It is possible that all the async methods returned on the same thread because the
       // session with registry data and stuff was available in the pool.
       // If this happens, we'll take the session out here and "cancel" the init so we skip
       // processing the message that the successful init has queued for us.
-      boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions);
+      boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions, syncWork.pathsToDelete);
       if (!isDone) {
         pool.initializingSessions.add(sw);
       }
@@ -1458,22 +1472,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
 
   @Override
-  public TezSessionState reopen(TezSessionState session, Configuration conf,
-    String[] additionalFiles) throws Exception {
+  public TezSessionState reopen(TezSessionState session) throws Exception {
     WmTezSession wmTezSession = ensureOwnedSession(session);
     HiveConf sessionConf = wmTezSession.getConf();
     if (sessionConf == null) {
+      // TODO: can this ever happen?
       LOG.warn("Session configuration is null for " + wmTezSession);
       sessionConf = new HiveConf(conf, WorkloadManager.class);
     }
-    // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
-    //       with additional files will have to wait until HIVE-17827 is unfucked, because it's
-    //       difficult to determine how the additionalFiles are to be propagated/reused between
-    //       two sessions. Once the update logic is encapsulated in the session we can remove this.
-    if (additionalFiles != null && additionalFiles.length > 0) {
-      TezSessionPoolManager.reopenInternal(session, additionalFiles);
-      return session;
-    }
 
     SettableFuture<WmTezSession> future = SettableFuture.create();
     currentLock.lock();
@@ -1493,7 +1499,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
     // By definition, this session is not in use and can no longer be in use, so it only
     // affects the session pool. We can handle this inline.
-    tezAmPool.replaceSession(ensureOwnedSession(session), false, null);
+    tezAmPool.replaceSession(ensureOwnedSession(session));
   }
 
   // ======= VARIOUS UTILITY METHOD
@@ -1637,7 +1643,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     public void update(int queryParallelism, double fraction,
-        Map<WmTezSession, KillQueryContext> toKill, EventState e) {
+        WmThreadSyncWork syncWork, EventState e) {
       this.finalFraction = this.finalFractionRemaining = fraction;
       this.queryParallelism = queryParallelism;
       // TODO: two possible improvements
@@ -1646,7 +1652,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       //          If we could somehow restart queries we could instead put them at the front
       //          of the queue (esp. in conjunction with (1)) and rerun them.
       if (queryParallelism < getTotalActiveSessions()) {
-        extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill);
+        extractAllSessionsToKill("The query pool was resized by administrator",
+            e.toReuse, syncWork);
       }
       // We will requeue, and not kill, the queries that are not running yet.
       // Insert them all before the get requests from this iteration.
@@ -1656,9 +1663,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
     }
 
-    public void destroy(Map<WmTezSession, KillQueryContext> toKill,
+    public void destroy(WmThreadSyncWork syncWork,
         LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
-      extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
+      extractAllSessionsToKill("The query pool was removed by administrator", toReuse, syncWork);
       // All the pending get requests should just be requeued elsewhere.
       // Note that we never queue session reuse so sessionToReuse would be null.
       globalQueue.addAll(0, queue);
@@ -1694,19 +1701,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
     private void extractAllSessionsToKill(String killReason,
         IdentityHashMap<WmTezSession, GetRequest> toReuse,
-        Map<WmTezSession, KillQueryContext> toKill) {
+        WmThreadSyncWork syncWork) {
       for (WmTezSession sessionToKill : sessions) {
-        resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
+        resetRemovedSessionToKill(syncWork.toKillQuery,
+          new KillQueryContext(sessionToKill, killReason), toReuse);
       }
       sessions.clear();
       for (SessionInitContext initCtx : initializingSessions) {
         // It is possible that the background init thread has finished in parallel, queued
         // the message for us but also returned the session to the user.
-        WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason);
+        WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(
+            killReason, syncWork.pathsToDelete);
         if (sessionToKill == null) {
           continue; // Async op in progress; the callback will take care of this.
         }
-        resetRemovedSessionToKill(toKill, new KillQueryContext(sessionToKill, killReason), toReuse);
+        resetRemovedSessionToKill(syncWork.toKillQuery,
+          new KillQueryContext(sessionToKill, killReason), toReuse);
       }
       initializingSessions.clear();
     }
@@ -1740,14 +1750,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private SettableFuture<WmTezSession> future;
     private SessionInitState state;
     private String cancelReason;
+    private HiveResources prelocalizedResources;
+    private Path pathToDelete;
     private WmContext wmContext;
 
-    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId,
-      final WmContext wmContext) {
+    public SessionInitContext(SettableFuture<WmTezSession> future,
+        String poolName, String queryId, WmContext wmContext,
+        HiveResources prelocalizedResources) {
       this.state = SessionInitState.GETTING;
       this.future = future;
       this.poolName = poolName;
       this.queryId = queryId;
+      this.prelocalizedResources = prelocalizedResources;
       this.wmContext = wmContext;
     }
 
@@ -1765,7 +1779,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           session.setPoolName(poolName);
           session.setQueueName(yarnQueue);
           session.setQueryId(queryId);
-          session.setWmContext(wmContext);
+          if (prelocalizedResources != null) {
+            pathToDelete = session.replaceHiveResources(prelocalizedResources, true);
+          }
+          if (wmContext != null) {
+            session.setWmContext(wmContext);
+          }
           this.session = session;
           this.state = SessionInitState.WAITING_FOR_REGISTRY;
           break;
@@ -1855,7 +1874,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         session.setQueryId(null);
         // We can just restart the session if we have received one.
         try {
-          tezAmPool.replaceSession(session, false, null);
+          tezAmPool.replaceSession(session);
         } catch (Exception e) {
           LOG.error("Failed to restart a failed session", e);
         }
@@ -1863,7 +1882,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     /** Cancel the async operation (even if it's done), and return the session if done. */
-    public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) {
+    public WmTezSession cancelAndExtractSessionIfDone(String cancelReason, List<Path> toDelete) {
       lock.lock();
       try {
         SessionInitState state = this.state;
@@ -1872,6 +1891,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (state == SessionInitState.DONE) {
           WmTezSession result = this.session;
           this.session = null;
+          if (pathToDelete != null) {
+            toDelete.add(pathToDelete);
+          }
           return result;
         } else {
           // In the states where a background operation is in progress, wait for the callback.
@@ -1887,11 +1909,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     /** Extracts the session and cancel the operation, both only if done. */
-    public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) {
+    public boolean extractSessionAndCancelIfDone(
+        List<WmTezSession> results, List<Path> toDelete) {
       lock.lock();
       try {
         if (state != SessionInitState.DONE) return false;
         this.state = SessionInitState.CANCELED;
+        if (pathToDelete != null) {
+          toDelete.add(pathToDelete);
+        }
         if (this.session != null) {
           results.add(this.session);
         } // Otherwise we have failed; the callback has taken care of the failure.

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index 9726af1..5ade1f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -92,7 +92,7 @@ public class TezJobMonitor {
         try {
           // TODO: why does this only kill non-default sessions?
           // Nothing for workload management since that only deals with default ones.
-          TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
+          TezSessionPoolManager.getInstance().closeNonDefaultSessions();
         } catch (Exception e) {
           // ignore
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 4148a8a..d6ae171 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -356,9 +356,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       // Update the queryId to use the generated applicationId. See comment below about
       // why this is done.
       HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
-      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
-          new ArrayList<LocalResource>(), fs, ctx, false, work,
-          work.getVertexType(mapWork));
+      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work,
+          work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null));
       String vertexName = wx.getName();
       dag.addVertex(wx);
       utils.addCredentials(mapWork, dag);

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 5248454..0a47cda 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -20,16 +20,12 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 
 import com.google.common.util.concurrent.Futures;
-
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.concurrent.ScheduledExecutorService;
 import javax.security.auth.login.LoginException;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -83,8 +79,7 @@ public class SampleTezSessionState extends WmTezSession {
   }
 
   @Override
-  public void open(Collection<String> additionalFiles, Path scratchDir)
-      throws LoginException, IOException {
+  public void open(HiveResources resources) throws LoginException, IOException {
     open();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 829ea8c..8fbe9a7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -20,20 +20,16 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import static org.junit.Assert.*;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 public class TestTezSessionPool {
 
@@ -190,22 +186,22 @@ public class TestTezSessionPool {
       Mockito.when(session.isDefault()).thenReturn(false);
       Mockito.when(session.getConf()).thenReturn(conf);
 
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
 
-      Mockito.verify(session).close(true);
-      Mockito.verify(session).open(new HashSet<String>(), null);
+      Mockito.verify(session).close(false);
+      Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
 
       // mocked session starts with default queue
       assertEquals("default", session.getQueueName());
 
       // user explicitly specified queue name
       conf.set("tez.queue.name", "tezq1");
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user unsets queue name, will fallback to default session queue
       conf.unset("tez.queue.name");
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
       assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // session.open will unset the queue name from conf but Mockito intercepts the open call
@@ -213,17 +209,17 @@ public class TestTezSessionPool {
       conf.unset("tez.queue.name");
       // change session's default queue to tezq1 and rerun test sequence
       Mockito.when(session.getQueueName()).thenReturn("tezq1");
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user sets default queue now
       conf.set("tez.queue.name", "default");
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
       assertEquals("default", poolManager.getSession(null, conf, false, false).getQueueName());
 
       // user does not specify queue so use session default
       conf.unset("tez.queue.name");
-      poolManager.reopen(session, conf, null);
+      poolManager.reopen(session);
       assertEquals("tezq1", poolManager.getSession(null, conf, false, false).getQueueName());
     } catch (Exception e) {
       e.printStackTrace();
@@ -328,10 +324,10 @@ public class TestTezSessionPool {
     Mockito.when(session.isDefault()).thenReturn(false);
     Mockito.when(session.getConf()).thenReturn(conf);
 
-    poolManager.reopen(session, conf, null);
+    poolManager.reopen(session);
 
-    Mockito.verify(session).close(true);
-    Mockito.verify(session).open(new HashSet<String>(), null);
+    Mockito.verify(session).close(false);
+    Mockito.verify(session).open(Mockito.<TezSessionState.HiveResources>any());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 47aa936..44d2b66 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -23,16 +23,16 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
 
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hive.common.util.Ref;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -96,8 +96,8 @@ public class TestTezTask {
     when(utils.getTezDir(any(Path.class))).thenReturn(path);
     when(
         utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
-            any(LocalResource.class), any(List.class), any(FileSystem.class), any(Context.class),
-            anyBoolean(), any(TezWork.class), any(VertexType.class))).thenAnswer(
+            any(FileSystem.class), any(Context.class),
+            anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer(
         new Answer<Vertex>() {
 
           @Override
@@ -163,7 +163,7 @@ public class TestTezTask {
     task.setQueryPlan(mockQueryPlan);
 
     conf = new JobConf();
-    appLr = mock(LocalResource.class);
+    appLr = createResource("foo.jar");
 
     HiveConf hiveConf = new HiveConf();
     hiveConf
@@ -173,8 +173,7 @@ public class TestTezTask {
     session = mock(TezClient.class);
     sessionState = mock(TezSessionState.class);
     when(sessionState.getSession()).thenReturn(session);
-    when(sessionState.reopen(any(Configuration.class), any(String[].class)))
-      .thenReturn(sessionState);
+    when(sessionState.reopen()).thenReturn(sessionState);
     when(session.submitDAG(any(DAG.class)))
       .thenThrow(new SessionNotRunning(""))
       .thenReturn(mock(DAGClient.class));
@@ -192,7 +191,8 @@ public class TestTezTask {
 
   @Test
   public void testBuildDag() throws IllegalArgumentException, IOException, Exception {
-    DAG dag = task.build(conf, work, path, appLr, null, new Context(conf));
+    DAG dag = task.build(conf, work, path, new Context(conf),
+        DagUtils.createTezLrMap(appLr, null));
     for (BaseWork w: work.getAllWork()) {
       Vertex v = dag.getVertex(w.getName());
       assertNotNull(v);
@@ -212,17 +212,17 @@ public class TestTezTask {
 
   @Test
   public void testEmptyWork() throws IllegalArgumentException, IOException, Exception {
-    DAG dag = task.build(conf, new TezWork("", null), path, appLr, null, new Context(conf));
+    DAG dag = task.build(conf, new TezWork("", null), path, new Context(conf),
+        DagUtils.createTezLrMap(appLr, null));
     assertEquals(dag.getVertices().size(), 0);
   }
 
   @Test
   public void testSubmit() throws Exception {
     DAG dag = DAG.create("test");
-    task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(),
-        new String[0], Collections.<String,LocalResource> emptyMap());
+    task.submit(conf, dag, Ref.from(sessionState));
     // validate close/reopen
-    verify(sessionState, times(1)).reopen(any(Configuration.class), any(String[].class));
+    verify(sessionState, times(1)).reopen();
     verify(session, times(2)).submitDAG(any(DAG.class));
   }
 
@@ -235,53 +235,22 @@ public class TestTezTask {
   @Test
   public void testExistingSessionGetsStorageHandlerResources() throws Exception {
     final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
-    LocalResource res = mock(LocalResource.class);
-    final List<LocalResource> resources = Collections.singletonList(res);
-    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
-    resMap.put("foo.jar", res);
-
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
-        .thenReturn(resources);
-    when(utils.getBaseName(res)).thenReturn("foo.jar");
-    when(sessionState.isOpen()).thenReturn(true);
-    when(sessionState.isOpening()).thenReturn(false);
-    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
-    task.updateSession(sessionState, conf, path, inputOutputJars, resMap);
-    verify(session).addAppMasterLocalFiles(resMap);
-  }
-
-  @Test
-  public void testExtraResourcesAddedToDag() throws Exception {
-    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
-    LocalResource res = mock(LocalResource.class);
+    LocalResource res = createResource(inputOutputJars[0]);
     final List<LocalResource> resources = Collections.singletonList(res);
-    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
-    resMap.put("foo.jar", res);
-    DAG dag = mock(DAG.class);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
-        .thenReturn(resources);
-    when(utils.getBaseName(res)).thenReturn("foo.jar");
+    when(utils.localizeTempFiles(anyString(), any(Configuration.class), eq(inputOutputJars),
+        any(String[].class))).thenReturn(resources);
     when(sessionState.isOpen()).thenReturn(true);
     when(sessionState.isOpening()).thenReturn(false);
-    when(sessionState.hasResources(inputOutputJars)).thenReturn(false);
-    task.addExtraResourcesToDag(sessionState, dag, inputOutputJars, resMap);
-    verify(dag).addTaskLocalFiles(resMap);
+    task.ensureSessionHasResources(sessionState, inputOutputJars);
+    // TODO: ideally we should have a test for session itself.
+    verify(sessionState).ensureLocalResources(any(Configuration.class), eq(inputOutputJars));
   }
 
-  @Test
-  public void testGetExtraLocalResources() throws Exception {
-    final String[] inputOutputJars = new String[] {"file:///tmp/foo.jar"};
+  private static LocalResource createResource(String url) {
     LocalResource res = mock(LocalResource.class);
-    final List<LocalResource> resources = Collections.singletonList(res);
-    final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
-    resMap.put("foo.jar", res);
-
-    when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
-        Mockito.<String[]>any())).thenReturn(resources);
-    when(utils.getBaseName(res)).thenReturn("foo.jar");
-
-    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
+    when(res.getResource()).thenReturn(URL.fromPath(new Path(url)));
+    return res;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/89dbf4e9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index c58e450..fc8f66a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -42,7 +42,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
@@ -212,9 +211,8 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public TezSessionState reopen(
-        TezSessionState session, Configuration conf, String[] additionalFiles) throws Exception {
-      session = super.reopen(session, conf, additionalFiles);
+    public TezSessionState reopen(TezSessionState session) throws Exception {
+      session = super.reopen(session);
       ensureWm();
       return session;
     }
@@ -274,7 +272,7 @@ public class TestWorkloadManager {
         null, new MappingInput("user", null), conf, null);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
-    WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
+    WmTezSession session2 = (WmTezSession) session.reopen();
     assertNotSame(session, session2);
     wm.addTestEvent().get();
     assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
@@ -682,7 +680,7 @@ public class TestWorkloadManager {
     waitForThreadToBlock(cdl1, t1);
     checkError(error);
     // Replacing it directly in the pool should unblock get.
-    pool.replaceSession(oob, false, null);
+    pool.replaceSession(oob);
     t1.join();
     assertNotNull(sessionA1.get());
     assertEquals("A", sessionA1.get().getPoolName());


[2/2] hive git commit: HIVE-18003 : add explicit jdbc connection string args for mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-18003 : add explicit jdbc connection string args for mappings (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e120bd8b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e120bd8b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e120bd8b

Branch: refs/heads/master
Commit: e120bd8b0b8b74516651f3ae9e4e7d3a170b0d4d
Parents: 89dbf4e
Author: sergey <se...@apache.org>
Authored: Thu Dec 14 15:55:25 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Thu Dec 14 15:55:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +-
 .../org/apache/hive/jdbc/HiveConnection.java    |   5 +
 jdbc/src/java/org/apache/hive/jdbc/Utils.java   |   1 +
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  10 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |  38 +++++-
 .../hive/ql/exec/tez/WorkloadManager.java       |  30 +++--
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 117 ++++++++++++-------
 7 files changed, 147 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7a81612..711dfbd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2435,9 +2435,13 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
         "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
         "workload management is enabled and used for these sessions."),
-    HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+    HIVE_SERVER2_WM_WORKER_THREADS("hive.server2.wm.worker.threads", 4,
         "Number of worker threads to use to perform the synchronous operations with Tez\n" +
         "sessions for workload management (e.g. opening, closing, etc.)"),
+    HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC("hive.server2.wm.allow.any.pool.via.jdbc", false,
+        "Applies when a user specifies a target WM pool in the JDBC connection string. If\n" +
+        "false, the user can only specify a pool he is mapped to (e.g. make a choice among\n" +
+        "multiple group mappings); if true, the user can specify any existing pool."),
     HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting to use the\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
index fc937e6..45acf13 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java
@@ -138,6 +138,7 @@ public class HiveConnection implements java.sql.Connection {
   private TProtocolVersion protocol;
   private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE;
   private String initFile = null;
+  private String wmPool = null;
   private Properties clientInfo;
 
   /**
@@ -178,6 +179,7 @@ public class HiveConnection implements java.sql.Connection {
     if (sessConfMap.containsKey(JdbcConnectionParams.INIT_FILE)) {
       initFile = sessConfMap.get(JdbcConnectionParams.INIT_FILE);
     }
+    wmPool = sessConfMap.get(JdbcConnectionParams.WM_POOL);
 
     // add supported protocols
     supportedProtocols.add(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1);
@@ -680,6 +682,9 @@ public class HiveConnection implements java.sql.Connection {
     // set the fetchSize
     openConf.put("set:hiveconf:hive.server2.thrift.resultset.default.fetch.size",
       Integer.toString(fetchSize));
+    if (wmPool != null) {
+      openConf.put("set:hivevar:wmpool", wmPool);
+    }
 
     // set the session configuration
     Map<String, String> sessVars = connParams.getSessionVars();

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
index 855de88..bb13682 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java
@@ -123,6 +123,7 @@ public class Utils {
     // Set the fetchSize
     static final String FETCH_SIZE = "fetchSize";
     static final String INIT_FILE = "initFile";
+    static final String WM_POOL = "wmPool";
 
     // --------------- Begin 2 way ssl options -------------------------
     // Use two way ssl. This param will take effect only when ssl=true

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 27799a8..1a24b44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -156,9 +156,13 @@ public class TezTask extends Task<TezWork> {
       // based on Hadoop configuration, as documented at
       // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html
       String userName = ss.getUserName();
-      MappingInput mi = (userName == null) ? new MappingInput("anonymous", null)
-        : new MappingInput(ss.getUserName(),
-            UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups());
+      List<String> groups = null;
+      if (userName == null) {
+        userName = "anonymous";
+      } else {
+        groups = UserGroupInformation.createRemoteUser(ss.getUserName()).getGroups();
+      }
+      MappingInput mi = new MappingInput(userName, groups, ss.getHiveVariables().get("wmpool"));
 
       WmContext wmContext = ctx.getWmContext();
       // jobConf will hold all the configuration for hadoop, tez, and hive

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 33ee8f7..5919f3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Set;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,13 +57,25 @@ class UserPoolMapping {
 
   /** Contains all the information necessary to map a query to a pool. */
   public static final class MappingInput {
-    private final String userName;
+    private final String userName, wmPool;
     private final List<String> groups;
     // TODO: we may add app name, etc. later
 
-    public MappingInput(String userName, List<String> groups) {
+    public MappingInput(String userName, List<String> groups, String wmPool) {
       this.userName = userName;
       this.groups = groups;
+      this.wmPool = wmPool;
+    }
+
+    // TODO: move these into tests when there are fewer conflicting patches pending.
+    @VisibleForTesting
+    public MappingInput(String userName) {
+      this(userName, null);
+    }
+
+    @VisibleForTesting
+    public MappingInput(String userName, List<String> groups) {
+      this(userName, groups, null);
     }
 
     public List<String> getGroups() {
@@ -73,7 +88,7 @@ class UserPoolMapping {
 
     @Override
     public String toString() {
-      return getUserName() + "; groups " + groups;
+      return "{" + getUserName() + "; pool " + wmPool + "; groups " + groups + "}";
     }
   }
 
@@ -107,17 +122,32 @@ class UserPoolMapping {
     }
   }
 
-  public String mapSessionToPoolName(MappingInput input) {
+  public String mapSessionToPoolName(MappingInput input, boolean allowAnyPool, Set<String> pools) {
+    if (allowAnyPool && input.wmPool != null) {
+      return (pools == null || pools.contains(input.wmPool)) ? input.wmPool : null;
+    }
     // For equal-priority rules, user rules come first because they are more specific (arbitrary).
     Mapping mapping = userMappings.get(input.getUserName());
+    boolean isExplicitMatch = false;
+    if (mapping != null) {
+      isExplicitMatch = isExplicitPoolMatch(input, mapping);
+      if (isExplicitMatch) return mapping.fullPoolName;
+    }
     for (String group : input.getGroups()) {
       Mapping candidate = groupMappings.get(group);
       if (candidate == null) continue;
+      isExplicitMatch = isExplicitPoolMatch(input, candidate);
+      if (isExplicitMatch) return candidate.fullPoolName;
       if (mapping == null || candidate.priority < mapping.priority) {
         mapping = candidate;
       }
     }
+    if (input.wmPool != null && !isExplicitMatch) return null;
     if (mapping != null) return mapping.fullPoolName;
     return defaultPoolPath;
   }
+
+  private static boolean isExplicitPoolMatch(MappingInput input, Mapping mapping) {
+    return input.wmPool != null && input.wmPool.equals(mapping.fullPoolName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 1f4843d..f0481fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -103,6 +103,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private final QueryAllocationManager allocationManager;
   private final String yarnQueue;
   private final int amRegistryTimeoutMs;
+  private final boolean allowAnyPool;
   // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
   //       sessions, so the pool itself could internally track the sessions it gave out, since
   //       calling close on an unopened session is probably harmless.
@@ -204,11 +205,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // Only creates the expiration tracker if expiration is configured.
     expirationTracker = SessionExpirationTracker.create(conf, this);
 
-    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS),
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Workload management worker %d").build());
+    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(
+        conf, ConfVars.HIVE_SERVER2_WM_WORKER_THREADS), new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Workload management worker %d").build());
 
-    timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
-      .setNameFormat("Workload management timeout thread").build());
+    timeoutPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
+      .setDaemon(true).setNameFormat("Workload management timeout thread").build());
+
+    allowAnyPool = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC);
 
     wmThread = new Thread(() -> runWmThread(), "Workload management master");
     wmThread.setDaemon(true);
@@ -1047,7 +1051,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
   private void queueGetRequestOnMasterThread(
       GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
-    String poolName = userPoolMapping.mapSessionToPoolName(req.mappingInput);
+    String poolName = userPoolMapping.mapSessionToPoolName(
+        req.mappingInput, allowAnyPool, allowAnyPool ? pools.keySet() : null);
     if (poolName == null) {
       req.future.setException(new NoPoolMappingException(
           "Cannot find any pool mapping for " + req.mappingInput));
@@ -1319,8 +1324,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  @VisibleForTesting
   public WmTezSession getSession(
-    TezSessionState session, MappingInput input, HiveConf conf, final WmContext wmContext) throws Exception {
+      TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
+    return getSession(session, input, conf, null);
+  }
+
+  public WmTezSession getSession(TezSessionState session, MappingInput input, HiveConf conf,
+      final WmContext wmContext) throws Exception {
     WmEvent wmEvent = new WmEvent(WmEvent.EventType.GET);
     // Note: not actually used for pool sessions; verify some things like doAs are not set.
     validateConfig(conf);
@@ -1936,8 +1947,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
   boolean isManaged(MappingInput input) {
     // This is always replaced atomically, so we don't care about concurrency here.
-    if (userPoolMapping != null) {
-      String mappedPool = userPoolMapping.mapSessionToPoolName(input);
+    UserPoolMapping mapping = userPoolMapping;
+    if (mapping != null) {
+      // Don't pass in the pool set - not thread safe; if the user is trying to force us to
+      // use a non-existent pool, we want to fail anyway. We will fail later during get.
+      String mappedPool = mapping.mapSessionToPoolName(input, allowAnyPool, null);
       LOG.info("Mapping input: {} mapped to pool: {}", input, mappedPool);
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e120bd8b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index fc8f66a..98f5c58 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -32,7 +32,6 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
 import java.lang.Thread.State;
@@ -86,7 +85,7 @@ public class TestWorkloadManager {
         cdl.countDown();
       }
       try {
-       session.set((WmTezSession) wm.getSession(old, new MappingInput(userName, null), conf, null));
+       session.set((WmTezSession) wm.getSession(old, new MappingInput(userName), conf));
       } catch (Throwable e) {
         error.compareAndSet(null, e);
       }
@@ -227,17 +226,17 @@ public class TestWorkloadManager {
     TezSessionState nonPool = mock(TezSessionState.class);
     when(nonPool.getConf()).thenReturn(conf);
     doNothing().when(nonPool).close(anyBoolean());
-    TezSessionState session = wm.getSession(nonPool, new MappingInput("user", null), conf, null);
+    TezSessionState session = wm.getSession(nonPool, new MappingInput("user"), conf);
     verify(nonPool).close(anyBoolean());
     assertNotSame(nonPool, session);
     session.returnToSessionManager();
     TezSessionPoolSession diffPool = mock(TezSessionPoolSession.class);
     when(diffPool.getConf()).thenReturn(conf);
     doNothing().when(diffPool).returnToSessionManager();
-    session = wm.getSession(diffPool, new MappingInput("user", null), conf, null);
+    session = wm.getSession(diffPool, new MappingInput("user"), conf);
     verify(diffPool).returnToSessionManager();
     assertNotSame(diffPool, session);
-    TezSessionState session2 = wm.getSession(session, new MappingInput("user", null), conf, null);
+    TezSessionState session2 = wm.getSession(session, new MappingInput("user"), conf);
     assertSame(session, session2);
   }
 
@@ -249,11 +248,11 @@ public class TestWorkloadManager {
     wm.start();
     // The queue should be ignored.
     conf.set(TezConfiguration.TEZ_QUEUE_NAME, "test2");
-    TezSessionState session = wm.getSession(null, new MappingInput("user", null), conf, null);
+    TezSessionState session = wm.getSession(null, new MappingInput("user"), conf);
     assertEquals("test", session.getQueueName());
     assertEquals("test", conf.get(TezConfiguration.TEZ_QUEUE_NAME));
     session.setQueueName("test2");
-    session = wm.getSession(session, new MappingInput("user", null), conf, null);
+    session = wm.getSession(session, new MappingInput("user"), conf);
     assertEquals("test", session.getQueueName());
   }
 
@@ -269,7 +268,7 @@ public class TestWorkloadManager {
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 1, qam);
     wm.start();
     WmTezSession session = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
     WmTezSession session2 = (WmTezSession) session.reopen();
@@ -287,10 +286,10 @@ public class TestWorkloadManager {
     MockQam qam = new MockQam();
     WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
-    WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+    WmTezSession session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
     assertEquals(1.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
-    WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+    WmTezSession session2 = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
     assertEquals(0.5, session.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -301,7 +300,7 @@ public class TestWorkloadManager {
     qam.assertWasCalledAndReset();
 
     // We never lose pool session, so we should still be able to get.
-    session = (WmTezSession) wm.getSession(null, new MappingInput("user", null), conf, null);
+    session = (WmTezSession) wm.getSession(null, new MappingInput("user"), conf);
     session.returnToSessionManager();
     assertEquals(1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
@@ -322,20 +321,20 @@ public class TestWorkloadManager {
     assertEquals(5, wm.getNumSessions());
     // Get all the 5 sessions; validate cluster fractions.
     WmTezSession session05of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p1", null), conf, null);
+        null, new MappingInput("p1"), conf);
     assertEquals(0.3, session05of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf, null);
+        null, new MappingInput("p2"), conf);
     assertEquals(0.18, session03of06.getClusterFraction(), EPSILON);
     WmTezSession session03of06_2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("p2", null), conf, null);
+        null, new MappingInput("p2"), conf);
     assertEquals(0.09, session03of06.getClusterFraction(), EPSILON);
     assertEquals(0.09, session03of06_2.getClusterFraction(), EPSILON);
     WmTezSession session02of06 = (WmTezSession) wm.getSession(
-        null,new MappingInput("r1", null), conf, null);
+        null,new MappingInput("r1"), conf);
     assertEquals(0.12, session02of06.getClusterFraction(), EPSILON);
     WmTezSession session04 = (WmTezSession) wm.getSession(
-        null, new MappingInput("r2", null), conf, null);
+        null, new MappingInput("r2"), conf);
     assertEquals(0.4, session04.getClusterFraction(), EPSILON);
     session05of06.returnToSessionManager();
     session03of06.returnToSessionManager();
@@ -347,6 +346,7 @@ public class TestWorkloadManager {
   @Test(timeout = 10000)
   public void testMappings() throws Exception {
     HiveConf conf = createConf();
+    conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "false");
     MockQam qam = new MockQam();
     WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
         Lists.newArrayList(pool("u0"), pool("g0"), pool("g1"), pool("u2")));
@@ -363,6 +363,31 @@ public class TestWorkloadManager {
     verifyMapping(wm, conf, new MappingInput("u0", groups("g0")), "u0");
     verifyMapping(wm, conf, new MappingInput("u2", groups("g1")), "g1");
     verifyMapping(wm, conf, new MappingInput("u2", groups("g0", "g1")), "g0");
+    // Check explicit pool specifications - valid cases where priority is changed.
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g1"), "g1"), "g1");
+    verifyMapping(wm, conf, new MappingInput("u2", groups("g1"), "u2"), "u2");
+    verifyMapping(wm, conf, new MappingInput("zzz", groups("g0", "g1"), "g1"), "g1");
+    // Explicit pool specification - invalid - there's no mapping that matches.
+    try {
+      TezSessionState r = wm.getSession(
+        null, new MappingInput("u0", groups("g0", "g1"), "u2"), conf);
+      fail("Expected failure, but got " + r);
+    } catch (Exception ex) {
+      // Expected.
+    }
+    // Now allow the users to specify any pools.
+    conf.set(ConfVars.HIVE_SERVER2_WM_ALLOW_ANY_POOL_VIA_JDBC.varname, "true");
+    wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    verifyMapping(wm, conf, new MappingInput("u0", groups("g0", "g1"), "u2"), "u2");
+    // The mapping that doesn't exist still shouldn't work.
+    try {
+      TezSessionState r = wm.getSession(
+        null, new MappingInput("u0", groups("g0", "g1"), "zzz"), conf);
+      fail("Expected failure, but got " + r);
+    } catch (Exception ex) {
+      // Expected.
+    }
   }
 
   private static void verifyMapping(
@@ -372,6 +397,9 @@ public class TestWorkloadManager {
     session.returnToSessionManager();
   }
 
+  
+
+
   @Test(timeout=10000)
   public void testQueueing() throws Exception {
     final HiveConf conf = createConf();
@@ -381,9 +409,9 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
     final AtomicReference<WmTezSession> sessionA3 = new AtomicReference<>(),
         sessionA4 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -397,7 +425,7 @@ public class TestWorkloadManager {
     assertNull(sessionA4.get());
     checkError(error);
     // While threads are blocked on A, we should still be able to get and return a B session.
-    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null);
+    WmTezSession sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf);
     sessionB1.returnToSessionManager();
     sessionB2.returnToSessionManager();
     assertNull(sessionA3.get());
@@ -425,8 +453,8 @@ public class TestWorkloadManager {
     plan.getPlan().setDefaultPoolPath("A");
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
-    WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
-        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession session1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+        session2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
     qam.assertWasCalledAndReset();
@@ -448,19 +476,19 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, 2, qam);
     wm.start();
     WmTezSession session1 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
     // First, try to reuse from the same pool - should "just work".
     WmTezSession session1a = (WmTezSession) wm.getSession(
-        session1, new MappingInput("user", null), conf, null);
+        session1, new MappingInput("user"), conf);
     assertSame(session1, session1a);
     assertEquals(1.0, session1.getClusterFraction(), EPSILON);
     // Should still be able to get the 2nd session.
     WmTezSession session2 = (WmTezSession) wm.getSession(
-        null, new MappingInput("user", null), conf, null);
+        null, new MappingInput("user"), conf);
 
     // Now try to reuse with no other sessions remaining. Should still work.
     WmTezSession session2a = (WmTezSession) wm.getSession(
-        session2, new MappingInput("user", null), conf, null);
+        session2, new MappingInput("user"), conf);
     assertSame(session2, session2a);
     assertEquals(0.5, session1.getClusterFraction(), EPSILON);
     assertEquals(0.5, session2.getClusterFraction(), EPSILON);
@@ -517,19 +545,19 @@ public class TestWorkloadManager {
     plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
-        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+        sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.3f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("A", sessionA2.getPoolName());
     assertEquals(0.3f, sessionA2.getClusterFraction(), EPSILON);
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B", null), conf, null);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(sessionA1, new MappingInput("B"), conf);
     assertSame(sessionA1, sessionB1);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON); // A1 removed from A.
     // Make sure that we can still get a session from A.
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     assertEquals("A", sessionA3.getPoolName());
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
     assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
@@ -549,7 +577,7 @@ public class TestWorkloadManager {
     wm.start();
  
     // One session will be running, the other will be queued in "A"
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
     assertEquals("A", sessionA1.getPoolName());
     assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
@@ -574,7 +602,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
     // The new session will also go to B now.
     sessionA2.get().returnToSessionManager();
-    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U", null), conf, null);
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("U"), conf);
     assertEquals("B", sessionB1.getPoolName());
     assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
     sessionA1.returnToSessionManager();
@@ -598,11 +626,11 @@ public class TestWorkloadManager {
  
     // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
     // Total: 5/6 running.
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null),
-        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null),
-        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B", null), conf, null),
-        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C", null), conf, null),
-        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
+        sessionB2 = (WmTezSession) wm.getSession(null, new MappingInput("B"), conf),
+        sessionC1 = (WmTezSession) wm.getSession(null, new MappingInput("C"), conf),
+        sessionD1 = (WmTezSession) wm.getSession(null, new MappingInput("D"), conf);
     final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
         sessionD2 = new AtomicReference<>();
     final AtomicReference<Throwable> error = new AtomicReference<>();
@@ -738,7 +766,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
 
     // [A: 1, B: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -762,7 +790,7 @@ public class TestWorkloadManager {
     assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     // [A: 1, B: 1]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -789,7 +817,7 @@ public class TestWorkloadManager {
     assertEquals("B", sessionA2.getPoolName());
     assertEquals("B", sessionA1.getPoolName());
 
-    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     // [A: 1, B: 2]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -829,7 +857,7 @@ public class TestWorkloadManager {
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
 
-    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
 
     // [A: 1, B: 0, B.x: 0, B.y: 0, C: 0]
     Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
@@ -887,7 +915,8 @@ public class TestWorkloadManager {
     assertTrue(allSessionProviders.get("B.x").getSessions().contains(sessionA1));
     assertEquals("B.x", sessionA1.getPoolName());
 
-    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
+
     // [A: 1, B: 0, B.x: 1, B.y: 0, C: 0]
     allSessionProviders = wm.getAllSessionTriggerProviders();
     assertEquals(1, allSessionProviders.get("A").getSessions().size());
@@ -986,7 +1015,7 @@ public class TestWorkloadManager {
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
     try {
-      TezSessionState r = wm.getSession(null, new MappingInput("A", null), conf, null);
+      TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
       fail("Expected an error but got " + r);
     } catch (Exception ex) {
       // Expected.
@@ -1037,7 +1066,7 @@ public class TestWorkloadManager {
     assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
     pool.returnSession(theOnlySession);
     // Make sure we can actually get a session still - parallelism/etc. should not be affected.
-    WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A", null), conf, null);
+    WmTezSession result = (WmTezSession) wm.getSession(null, new MappingInput("A"), conf);
     assertEquals(sessionPoolName, result.getPoolName());
     assertEquals(1f, result.getClusterFraction(), EPSILON);
     result.returnToSessionManager();