You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/07/13 04:36:16 UTC

[04/10] hive git commit: HIVE-14111 : better concurrency handling for TezSessionState - part I (Sergey Shelukhin, reviewed by Siddharth Seth)

HIVE-14111 : better concurrency handling for TezSessionState - part I (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09fb8f12
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09fb8f12
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09fb8f12

Branch: refs/heads/branch-2.1
Commit: 09fb8f127c067089a27f218cbe1dc680ee6b41e1
Parents: 605e5aa
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Jul 12 21:05:14 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Jul 12 21:06:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |   2 +-
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  69 +++++-----
 .../hive/ql/exec/tez/TezSessionState.java       |  31 ++++-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 135 ++++++++++---------
 .../hadoop/hive/ql/session/SessionState.java    |  17 ++-
 .../hive/ql/exec/tez/TestTezSessionPool.java    |   4 +-
 .../cli/operation/MetadataOperation.java        |   2 +-
 .../hive/service/cli/operation/Operation.java   |   4 +-
 .../service/cli/operation/SQLOperation.java     |   1 +
 9 files changed, 155 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index fa68b75..655352d 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -981,8 +981,8 @@ public class QTestUtil {
         && (clusterType == MiniClusterType.tez || clusterType == MiniClusterType.llap)) {
       // Copy the tezSessionState from the old CliSessionState.
       tezSessionState = oldSs.getTezSession();
-      ss.setTezSession(tezSessionState);
       oldSs.setTezSession(null);
+      ss.setTezSession(tezSessionState);
       oldSs.close();
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index f8f3cad..917268f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -45,6 +45,7 @@ import java.util.Set;
 
 import javax.security.auth.login.LoginException;
 
+import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
@@ -82,7 +83,8 @@ public class TezSessionPoolManager {
 
   private Semaphore llapQueue;
   private HiveConf initConf = null;
-  int numConcurrentLlapQueries = -1;
+  // Config settings.
+  private int numConcurrentLlapQueries = -1;
   private long sessionLifetimeMs = 0;
   private long sessionLifetimeJitterMs = 0;
   /** A queue for initial sessions that have not been started yet. */
@@ -112,10 +114,11 @@ public class TezSessionPoolManager {
   }
 
   private void startInitialSession(TezSessionPoolSession sessionState) throws Exception {
-    HiveConf newConf = new HiveConf(initConf);
+    HiveConf newConf = new HiveConf(initConf); // TODO Why is this configuration management not happening inside TezSessionPool.
+    // Makes no senses for it to be mixed up like this.
     boolean isUsable = sessionState.tryUse();
     if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup");
-    newConf.set("tez.queue.name", sessionState.getQueueName());
+    newConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
     sessionState.open(newConf);
     if (sessionState.returnAfterUse()) {
       defaultQueuePool.put(sessionState);
@@ -134,6 +137,7 @@ public class TezSessionPoolManager {
         startInitialSession(session);
       }
     } else {
+      // TODO What is this doing now ?
       final SessionState parentSessionState = SessionState.get();
       // The runnable has no mutable state, so each thread can run the same thing.
       final AtomicReference<Exception> firstError = new AtomicReference<>(null);
@@ -150,6 +154,7 @@ public class TezSessionPoolManager {
             } catch (Exception e) {
               if (!firstError.compareAndSet(null, e)) {
                 LOG.error("Failed to start session; ignoring due to previous error", e);
+                // TODO Why even continue after this. We're already in a state where things are messed up ?
               }
             }
           }
@@ -248,8 +253,11 @@ public class TezSessionPoolManager {
     }
   }
 
+  // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
   private TezSessionPoolSession createAndInitSession(String queue, boolean isDefault) {
     TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId());
+    // TODO When will the queue ever be null.
+    // Pass queue and default in as constructor parameters, and make them final.
     if (queue != null) {
       sessionState.setQueueName(queue);
     }
@@ -266,6 +274,7 @@ public class TezSessionPoolManager {
       throws Exception {
     String queueName = conf.get("tez.queue.name");
 
+    // TODO Session re-use completely disabled for doAs=true. Always launches a new session.
     boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
 
     /*
@@ -333,7 +342,7 @@ public class TezSessionPoolManager {
 
   public static void closeIfNotDefault(
       TezSessionState tezSessionState, boolean keepTmpDir) throws Exception {
-    LOG.info("Closing tez session default? " + tezSessionState.isDefault());
+    LOG.info("Closing tez session if not default: " + tezSessionState);
     if (!tezSessionState.isDefault()) {
       tezSessionState.close(keepTmpDir);
     }
@@ -401,6 +410,11 @@ public class TezSessionPoolManager {
     try {
       UserGroupInformation ugi = Utils.getUGI();
       String userName = ugi.getShortUserName();
+      // TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well.
+      // Should a doAs check happen here instead of after the user test.
+      // With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query)
+
+      // Working in the assumption that the user here will be the hive user if doAs = false, we'll make it past this false check.
       LOG.info("The current user: " + userName + ", session user: " + session.getUser());
       if (userName.equals(session.getUser()) == false) {
         LOG.info("Different users incoming: " + userName + " existing: " + session.getUser());
@@ -412,34 +426,19 @@ public class TezSessionPoolManager {
 
     boolean doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
     // either variables will never be null because a default value is returned in case of absence
-    if (doAsEnabled != conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+    if (doAsEnabled != session.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
       return false;
     }
 
     if (!session.isDefault()) {
       String queueName = session.getQueueName();
-      LOG.info("Current queue name is " + queueName + " incoming queue name is "
-          + conf.get("tez.queue.name"));
-      if (queueName == null) {
-        if (conf.get("tez.queue.name") != null) {
-          // queue names are different
-          return false;
-        } else {
-          return true;
-        }
-      }
-
-      if (!queueName.equals(conf.get("tez.queue.name"))) {
-        // the String.equals method handles the case of conf not having the queue name as well.
-        return false;
-      }
+      String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
+      LOG.info("Current queue name is " + queueName + " incoming queue name is " + confQueueName);
+      return (queueName == null) ? confQueueName == null : queueName.equals(confQueueName);
     } else {
       // this session should never be a default session unless something has messed up.
-      throw new HiveException("Default queue should always be returned." +
-      "Hence we should not be here.");
+      throw new HiveException("The pool session " + session + " should have been returned to the pool"); 
     }
-
-    return true;
   }
 
   public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen,
@@ -447,6 +446,7 @@ public class TezSessionPoolManager {
     if (llap && (this.numConcurrentLlapQueries > 0)) {
       llapQueue.acquire(); // blocks if no more llap queries can be submitted.
     }
+
     if (canWorkWithSameSession(session, conf)) {
       return session;
     }
@@ -458,13 +458,16 @@ public class TezSessionPoolManager {
     return getSession(conf, doOpen, forceCreate);
   }
 
-  public void closeAndOpen(TezSessionState sessionState, HiveConf conf,
+  /** Reopens the session that was found to not be running. */
+  public void reopenSession(TezSessionState sessionState, HiveConf conf,
       String[] additionalFiles, boolean keepTmpDir) throws Exception {
     HiveConf sessionConf = sessionState.getConf();
-    if (sessionConf != null && sessionConf.get("tez.queue.name") != null) {
-      conf.set("tez.queue.name", sessionConf.get("tez.queue.name"));
+    if (sessionConf != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) != null) {
+      conf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME));
     }
-    closeIfNotDefault(sessionState, keepTmpDir);
+    // TODO: close basically resets the object to a bunch of nulls.
+    //       We should ideally not reuse the object because it's pointless and error-prone.
+    sessionState.close(keepTmpDir); // Clean up stuff.
     sessionState.open(conf, additionalFiles);
   }
 
@@ -479,7 +482,8 @@ public class TezSessionPoolManager {
     }
   }
 
-  private void closeAndReopen(TezSessionPoolSession oldSession) throws Exception {
+  /** Closes a running (expired) pool session and reopens it. */
+  private void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
     String queueName = oldSession.getQueueName();
     HiveConf conf = oldSession.getConf();
     Path scratchDir = oldSession.getTezScratchDir();
@@ -502,7 +506,7 @@ public class TezSessionPoolManager {
         TezSessionPoolSession next = restartQueue.take();
         LOG.info("Restarting the expired session [" + next + "]");
         try {
-          closeAndReopen(next);
+          closeAndReopenPoolSession(next);
         } catch (InterruptedException ie) {
           throw ie;
         } catch (Exception e) {
@@ -573,7 +577,7 @@ public class TezSessionPoolManager {
    * if it's time, the expiration is triggered; in that case, or if it was already triggered, the
    * caller gets a different session. When the session is in use when it expires, the expiration
    * thread ignores it and lets the return to the pool take care of the expiration.
-   * */
+   */
   @VisibleForTesting
   static class TezSessionPoolSession extends TezSessionState {
     private static final int STATE_NONE = 0, STATE_IN_USE = 1, STATE_EXPIRED = 2;
@@ -670,6 +674,7 @@ public class TezSessionPoolManager {
     public boolean tryExpire(boolean isAsync) throws Exception {
       if (expirationNs == null) return true;
       if (!shouldExpire()) return false;
+      // Try to expire the session if it's not in use; if in use, bail.
       while (true) {
         if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
         if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
@@ -683,7 +688,7 @@ public class TezSessionPoolManager {
       if (async) {
         parent.restartQueue.add(this);
       } else {
-        parent.closeAndReopen(this);
+        parent.closeAndReopenPoolSession(this);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index d04cfa3..1009359 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.login.LoginException;
 
@@ -115,6 +116,8 @@ public class TezSessionState {
   private boolean defaultQueue = false;
   private String user;
 
+  private AtomicReference<String> ownerThread = new AtomicReference<>(null);
+
   private final Set<String> additionalFilesNotFromConf = new HashSet<String>();
   private final Set<LocalResource> localizedResources = new HashSet<LocalResource>();
   private boolean doAsEnabled;
@@ -129,7 +132,7 @@ public class TezSessionState {
 
   public String toString() {
     return "sessionId=" + sessionId + ", queueName=" + queueName + ", user=" + user
-        + ", doAs=" + doAsEnabled + ", isOpen=" + isOpen();
+        + ", doAs=" + doAsEnabled + ", isOpen=" + isOpen() + ", isDefault=" + defaultQueue;
   }
 
   /**
@@ -226,12 +229,14 @@ public class TezSessionState {
       boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException,
         IllegalArgumentException, URISyntaxException, TezException {
     this.conf = conf;
-    this.queueName = conf.get("tez.queue.name");
+    // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two.
+    this.queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
     this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
 
-    final boolean llapMode = "llap".equals(HiveConf.getVar(
+    final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
         conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
 
+    // TODO This - at least for the session pool - will always be the hive user. How does doAs above this affect things ?
     UserGroupInformation ugi = Utils.getUGI();
     user = ugi.getShortUserName();
     LOG.info("User of session id " + sessionId + " is " + user);
@@ -285,6 +290,7 @@ public class TezSessionState {
         llapCredentials = new Credentials();
         llapCredentials.addToken(LlapTokenIdentifier.KIND_NAME, getLlapToken(user, tezConfig));
       }
+      // TODO Change this to not serialize the entire Configuration - minor.
       UserPayload servicePluginPayload = TezUtils.createUserPayloadFromConf(tezConfig);
       // we need plugins to handle llap and uber mode
       servicePluginsDescriptor = ServicePluginsDescriptor.create(true,
@@ -664,7 +670,7 @@ public class TezSessionState {
   }
 
   public void setDefault() {
-    defaultQueue  = true;
+    defaultQueue = true;
   }
 
   public boolean isDefault() {
@@ -686,4 +692,21 @@ public class TezSessionState {
   public boolean getDoAsEnabled() {
     return doAsEnabled;
   }
+
+  /** Mark session as free for use from TezTask, for safety/debugging purposes. */
+  public void markFree() {
+    if (ownerThread.getAndSet(null) == null) throw new AssertionError("Not in use");
+  }
+
+  /** Mark session as being in use from TezTask, for safety/debugging purposes. */
+  public void markInUse() {
+    String newName = Thread.currentThread().getName();
+    do {
+      String oldName = ownerThread.get();
+      if (oldName != null) {
+        throw new AssertionError("Tez session is already in use from "
+            + oldName + "; cannot use from " + newName);
+      }
+    } while (!ownerThread.compareAndSet(null, newName));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 9e114c0..e4b69a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -130,81 +130,73 @@ public class TezTask extends Task<TezWork> {
       // Need to remove this static hack. But this is the way currently to get a session.
       SessionState ss = SessionState.get();
       session = ss.getTezSession();
-      session =
-          TezSessionPoolManager.getInstance().getSession(session, conf, false,
-              getWork().getLlapMode());
+      session = TezSessionPoolManager.getInstance().getSession(
+          session, conf, false, getWork().getLlapMode());
       ss.setTezSession(session);
+      try {
+        // jobConf will hold all the configuration for hadoop, tez, and hive
+        JobConf jobConf = utils.createConfiguration(conf);
 
-      // jobConf will hold all the configuration for hadoop, tez, and hive
-      JobConf jobConf = utils.createConfiguration(conf);
+        // Get all user jars from work (e.g. input format stuff).
+        String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
 
-      // Get all user jars from work (e.g. input format stuff).
-      String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
+        // we will localize all the files (jars, plans, hashtables) to the
+        // scratch dir. let's create this and tmp first.
+        Path scratchDir = ctx.getMRScratchDir();
 
-      // we will localize all the files (jars, plans, hashtables) to the
-      // scratch dir. let's create this and tmp first.
-      Path scratchDir = ctx.getMRScratchDir();
+        // create the tez tmp dir
+        scratchDir = utils.createTezDir(scratchDir, conf);
 
-      // create the tez tmp dir
-      scratchDir = utils.createTezDir(scratchDir, conf);
+        Map<String,LocalResource> inputOutputLocalResources =
+            getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
 
-      Map<String,LocalResource> inputOutputLocalResources =
-          getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+        // Ensure the session is open and has the necessary local resources
+        updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
 
-      // Ensure the session is open and has the necessary local resources
-      updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
+        List<LocalResource> additionalLr = session.getLocalizedResources();
+        logResources(additionalLr);
 
-      List<LocalResource> additionalLr = session.getLocalizedResources();
+        // unless already installed on all the cluster nodes, we'll have to
+        // localize hive-exec.jar as well.
+        LocalResource appJarLr = session.getAppJarLr();
 
-      // log which resources we're adding (apart from the hive exec)
-      if (LOG.isDebugEnabled()) {
-        if (additionalLr == null || additionalLr.size() == 0) {
-          LOG.debug("No local resources to process (other than hive-exec)");
-        } else {
-          for (LocalResource lr: additionalLr) {
-            LOG.debug("Adding local resource: " + lr.getResource());
-          }
-        }
-      }
+        // next we translate the TezWork to a Tez DAG
+        DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
+        CallerContext callerContext = CallerContext.create(
+            "HIVE", queryPlan.getQueryId(),
+            "HIVE_QUERY_ID", queryPlan.getQueryStr());
+        dag.setCallerContext(callerContext);
 
-      // unless already installed on all the cluster nodes, we'll have to
-      // localize hive-exec.jar as well.
-      LocalResource appJarLr = session.getAppJarLr();
-
-      // next we translate the TezWork to a Tez DAG
-      DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
-      CallerContext callerContext = CallerContext.create(
-          "HIVE", queryPlan.getQueryId(),
-          "HIVE_QUERY_ID", queryPlan.getQueryStr());
-      dag.setCallerContext(callerContext);
-
-      // Add the extra resources to the dag
-      addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
-
-      // submit will send the job to the cluster and start executing
-      dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
-          additionalLr, inputOutputJars, inputOutputLocalResources);
-
-      // finally monitor will print progress until the job is done
-      TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
-      rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
-      if (rc != 0) {
-        this.setException(new HiveException(monitor.getDiagnostics()));
-      }
+        // Add the extra resources to the dag
+        addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
 
-      // fetch the counters
-      try {
-        Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
-      } catch (Exception err) {
-        // Don't fail execution due to counters - just don't print summary info
-        LOG.error("Failed to get counters: " + err, err);
-        counters = null;
+        // submit will send the job to the cluster and start executing
+        dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
+            additionalLr, inputOutputJars, inputOutputLocalResources);
+
+        // finally monitor will print progress until the job is done
+        TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
+        rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
+        if (rc != 0) {
+          this.setException(new HiveException(monitor.getDiagnostics()));
+        }
+
+        // fetch the counters
+        try {
+          Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+          counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
+        } catch (Exception err) {
+          // Don't fail execution due to counters - just don't print summary info
+          LOG.error("Failed to get counters: " + err, err);
+          counters = null;
+        }
+      } finally {
+        // We return this to the pool even if it's unusable; reopen is supposed to handle this.
+        TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode());
       }
-      TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode());
 
       if (LOG.isInfoEnabled() && counters != null
-          && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+          && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
           Utilities.isPerfOrAboveLogging(conf))) {
         for (CounterGroup group: counters) {
           LOG.info(group.getDisplayName() +":");
@@ -244,6 +236,18 @@ public class TezTask extends Task<TezWork> {
     return rc;
   }
 
+  private void logResources(List<LocalResource> additionalLr) {
+    // log which resources we're adding (apart from the hive exec)
+    if (!LOG.isDebugEnabled()) return;
+    if (additionalLr == null || additionalLr.size() == 0) {
+      LOG.debug("No local resources to process (other than hive-exec)");
+    } else {
+      for (LocalResource lr: additionalLr) {
+        LOG.debug("Adding local resource: " + lr.getResource());
+      }
+    }
+  }
+
   /**
    * Converted the list of jars into local resources
    */
@@ -270,6 +274,7 @@ public class TezTask extends Task<TezWork> {
         .hasResources(inputOutputJars);
 
     TezClient client = session.getSession();
+    // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
     if (client == null) {
       // can happen if the user sets the tez flag after the session was
       // established
@@ -448,8 +453,10 @@ public class TezTask extends Task<TezWork> {
         console.printInfo("Tez session was closed. Reopening...");
 
         // close the old one, but keep the tmp files around
-        TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars,
-            true);
+        // TODO Why is the session being create using a conf instance belonging to TezTask
+        //      - instead of the session conf instance.
+        TezSessionPoolManager.getInstance().reopenSession(
+            sessionState, this.conf, inputOutputJars, true);
         console.printInfo("Session re-established.");
 
         dagClient = sessionState.getSession().submitDAG(dag);
@@ -459,7 +466,7 @@ public class TezTask extends Task<TezWork> {
       try {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
-        TezSessionPoolManager.getInstance().closeAndOpen(sessionState, this.conf, inputOutputJars,
+        TezSessionPoolManager.getInstance().reopenSession(sessionState, this.conf, inputOutputJars,
             true);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ce43f7d..e318b92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -583,7 +583,7 @@ public class SessionState {
 
     try {
       if (startSs.tezSessionState == null) {
-        startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
+        startSs.setTezSession(new TezSessionState(startSs.getSessionId()));
       }
       if (startSs.tezSessionState.isOpen()) {
         return;
@@ -1487,12 +1487,12 @@ public class SessionState {
 
     try {
       if (tezSessionState != null) {
-        TezSessionPoolManager.getInstance().closeIfNotDefault(tezSessionState, false);
+        TezSessionPoolManager.closeIfNotDefault(tezSessionState, false);
       }
     } catch (Exception e) {
       LOG.info("Error closing tez session", e);
     } finally {
-      tezSessionState = null;
+      setTezSession(null);
     }
 
     try {
@@ -1577,8 +1577,17 @@ public class SessionState {
     return tezSessionState;
   }
 
+  /** Called from TezTask to attach a TezSession to use to the threadlocal. Ugly pattern... */
   public void setTezSession(TezSessionState session) {
-    this.tezSessionState = session;
+    if (tezSessionState == session) return; // The same object.
+    if (tezSessionState != null) {
+      tezSessionState.markFree();
+      tezSessionState = null;
+    }
+    if (session != null) {
+      session.markInUse();
+    }
+    tezSessionState = session;
   }
 
   public String getUserName() {

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 956fd29..ec90801 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -238,7 +238,7 @@ public class TestTezSessionPool {
     TezSessionState session = Mockito.mock(TezSessionState.class);
     Mockito.when(session.isDefault()).thenReturn(false);
 
-    poolManager.closeAndOpen(session, conf, null, false);
+    poolManager.reopenSession(session, conf, null, false);
 
     Mockito.verify(session).close(false);
     String[] files = null;
@@ -261,7 +261,7 @@ public class TestTezSessionPool {
     Mockito.when(session.isDefault()).thenReturn(false);
     String[] extraResources = new String[] { "file:///tmp/foo.jar" };
 
-    poolManager.closeAndOpen(session, conf, extraResources, false);
+    poolManager.reopenSession(session, conf, extraResources, false);
 
     Mockito.verify(session).close(false);
     Mockito.verify(session).open(conf, extraResources);

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
index 44463c9..d216454 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java
@@ -44,7 +44,7 @@ public abstract class MetadataOperation extends Operation {
   private static final char SEARCH_STRING_ESCAPE = '\\';
 
   protected MetadataOperation(HiveSession parentSession, OperationType opType) {
-    super(parentSession, opType, false);
+    super(parentSession, opType);
     setHasResultSet(true);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 4e56ea5..c3be295 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -85,8 +85,8 @@ public abstract class Operation {
   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
       EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
 
-  protected Operation(HiveSession parentSession, OperationType opType, boolean runInBackground) {
-    this(parentSession, null, opType, runInBackground);
+  protected Operation(HiveSession parentSession, OperationType opType) {
+    this(parentSession, null, opType, false);
  }
 
   protected Operation(HiveSession parentSession, Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {

http://git-wip-us.apache.org/repos/asf/hive/blob/09fb8f12/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 81e790c..57c954a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -290,6 +290,7 @@ public class SQLOperation extends ExecuteStatementOperation {
             @Override
             public Object run() throws HiveSQLException {
               Hive.set(parentHive);
+              // TODO: can this result in cross-thread reuse of session state?
               SessionState.setCurrentSessionState(parentSessionState);
               PerfLogger.setPerfLogger(parentPerfLogger);
               // Set current OperationLog in this async thread for keeping on saving query log.