You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/02 18:40:52 UTC

[3/3] hive git commit: HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77b99e4c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77b99e4c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77b99e4c

Branch: refs/heads/master
Commit: 77b99e4c9e39b9905ff98a002ca35f7edd9b7b30
Parents: c5a9673
Author: sergey <se...@apache.org>
Authored: Thu Nov 2 11:30:38 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Nov 2 11:30:38 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    3 +
 .../hive/jdbc/TestTriggersWorkloadManager.java  |   12 +-
 .../hadoop/hive/ql/exec/tez/AmPluginNode.java   |    2 +-
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |   29 +-
 .../exec/tez/LlapPluginEndpointClientImpl.java  |    2 +-
 .../ql/exec/tez/QueryAllocationManager.java     |    3 +-
 .../ql/exec/tez/SessionExpirationTracker.java   |   16 +-
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  371 ++++-
 .../hive/ql/exec/tez/TezSessionPoolManager.java |   74 +-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |   26 +-
 .../hive/ql/exec/tez/TezSessionState.java       |    2 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |   16 +-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  109 +-
 .../hive/ql/exec/tez/WorkloadManager.java       | 1549 ++++++++++++++----
 .../physical/LlapClusterStateForCompile.java    |   13 +-
 .../hive/ql/exec/tez/SampleTezSessionState.java |   44 +-
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  365 ++++-
 17 files changed, 2093 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3c853a..b65bdab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2419,6 +2419,9 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
         "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
         "workload management is enabled and used for these sessions."),
+    HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+        "Number of worker threads to use to perform the synchronous operations with Tez\n" +
+        "sessions for workload management (e.g. opening, closing, etc.)"),
     HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting to use the\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index fdb660a..0ec7e85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -16,17 +16,21 @@
 
 package org.apache.hive.jdbc;
 
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -70,6 +74,10 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
   @Override
   protected void setupTriggers(final List<Trigger> triggers) throws Exception {
     WorkloadManager wm = WorkloadManager.getInstance();
-    wm.getPools().get("llap").setTriggers(triggers);
+    TmpResourcePlan rp = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool(
+        "llap", null, 1, 1.0f, triggers)), Lists.newArrayList(
+            new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 1)));
+    wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
   }
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 35d380c..e4a21cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -39,5 +39,5 @@ public interface AmPluginNode {
     }
   }
 
-  AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, TimeoutException;
+  AmPluginInfo getAmPluginInfo() throws InterruptedException, TimeoutException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d978a25..53dd698 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -64,6 +64,8 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
 
   @Override
   public void start() {
+    // Try to get cluster information once, to avoid immediate cluster-update event in WM.
+    clusterState.initClusterInfo();
     clusterStateUpdateThread.start();
   }
 
@@ -72,9 +74,13 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
   }
 
+  public void initClusterInfo() {
+    clusterState.initClusterInfo();
+  }
+
   @VisibleForTesting
   protected int getExecutorCount(boolean allowUpdate) {
-    if (!clusterState.initClusterInfo(allowUpdate)) {
+    if (allowUpdate && !clusterState.initClusterInfo()) {
       LOG.warn("Failed to get LLAP cluster information for "
           + HiveConf.getTrimmedVar(this.conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS)
           + "; we may rely on outdated cluster status");
@@ -92,15 +98,18 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
   }
 
   @Override
-  public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
-    // Do not make a remote call unless we have no information at all.
+  public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+    // Do not make a remote call under any circumstances - this is supposed to be async.
     int totalCount = getExecutorCount(false);
-    int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+    int totalToDistribute = -1;
+    if (totalMaxAlloc != null) {
+      totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+    }
     double lastDelta = 0;
     for (int i = 0; i < sessionsToUpdate.size(); ++i) {
       WmTezSession session = sessionsToUpdate.get(i);
       int intAlloc = -1;
-      if (i + 1 == sessionsToUpdate.size()) {
+      if (i + 1 == sessionsToUpdate.size() && totalToDistribute >= 0) {
         intAlloc = totalToDistribute;
         // We rely on the caller to supply a reasonable total; we could log a warning
         // if this doesn't match the allocation of the last session beyond some threshold.
@@ -119,10 +128,12 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
         intAlloc = (int)roundedAlloc;
       }
       // Make sure we don't give out more than allowed due to double/rounding artifacts.
-      if (intAlloc > totalToDistribute) {
-        intAlloc = totalToDistribute;
+      if (totalToDistribute >= 0) {
+        if (intAlloc > totalToDistribute) {
+          intAlloc = totalToDistribute;
+        }
+        totalToDistribute -= intAlloc;
       }
-      totalToDistribute -= intAlloc;
       // This will only send update if it's necessary.
       updateSessionAsync(session, intAlloc);
     }
@@ -162,7 +173,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
       // RPC already handles retries, so we will just try to kill the session here.
       // This will cause the current query to fail. We could instead keep retrying.
       try {
-        session.destroy();
+        session.handleUpdateError();
       } catch (Exception e) {
         LOG.error("Failed to kill the session " + session);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
index 45c3e38..4f373162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
@@ -114,7 +114,7 @@ public class LlapPluginEndpointClientImpl extends
 
     private void ensureInfo() throws InterruptedException, TimeoutException {
       if (info != null) return;
-      info = node.waitForAmPluginInfo(0); // Don't wait - should already be initialized.
+      info = node.getAmPluginInfo(); // Don't wait - should already be initialized.
       if (info == null) {
         throw new AssertionError("A request was created without AM plugin info for " + node);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a326db3..acacfd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -26,7 +26,8 @@ interface QueryAllocationManager {
    * Updates the session allocations asynchoronously.
    * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
    *                      avoid various artifacts, esp. with small numbers and double weirdness.
+   *                      Null means the total is unknown.
    * @param sessions Sessions to update based on their allocation fraction.
    */
-  void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions);
+  void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
index da93a3a..77ff484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
@@ -48,7 +48,7 @@ class SessionExpirationTracker {
   private volatile SessionState initSessionState;
 
   interface RestartImpl {
-    void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception;
+    void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception;
   }
 
   public static SessionExpirationTracker create(HiveConf conf, RestartImpl restartImpl) {
@@ -114,7 +114,7 @@ class SessionExpirationTracker {
         TezSessionPoolSession next = restartQueue.take();
         LOG.info("Restarting the expired session [" + next + "]");
         try {
-          sessionRestartImpl.closeAndReopenPoolSession(next);
+          sessionRestartImpl.closeAndReopenExpiredSession(next);
         } catch (InterruptedException ie) {
           throw ie;
         } catch (Exception e) {
@@ -225,12 +225,12 @@ class SessionExpirationTracker {
     expirationQueue.remove(session);
   }
 
+  public void closeAndRestartExpiredSessionAsync(TezSessionPoolSession session) {
+    restartQueue.add(session);
+  }
+
   public void closeAndRestartExpiredSession(
-      TezSessionPoolSession session, boolean isAsync) throws Exception {
-    if (isAsync) {
-      restartQueue.add(session);
-    } else {
-      sessionRestartImpl.closeAndReopenPoolSession(session);
-    }
+      TezSessionPoolSession session) throws Exception {
+    sessionRestartImpl.closeAndReopenExpiredSession(session);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index b67c933..9c2b3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,17 +17,25 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -45,35 +53,48 @@ import org.slf4j.LoggerFactory;
 class TezSessionPool<SessionType extends TezSessionPoolSession> {
   private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class);
 
-  /** A queue for initial sessions that have not been started yet. */
-  private final Queue<SessionType> initialSessions =
-      new ConcurrentLinkedQueue<SessionType>();
+  public interface SessionObjectFactory<SessionType> {
+    SessionType create(SessionType oldSession);
+  }
 
   private final HiveConf initConf;
-  private int initialSize;
+  private int initialSize = 0; // For testing only.
+  private final SessionObjectFactory<SessionType> sessionObjFactory;
 
-  // TODO: eventually, this will need to support resize. That would probably require replacement
-  //       with a RW lock, a semaphore and linked list.
-  private BlockingDeque<SessionType> defaultQueuePool;
+  private final ReentrantLock poolLock = new ReentrantLock(true);
+  private final Condition notEmpty = poolLock.newCondition();
+  private final LinkedList<SessionType> pool = new LinkedList<>();
+  private final LinkedList<SettableFuture<SessionType>> asyncRequests = new LinkedList<>();
+  /**
+   * The number of sessions that needs to be started or killed because of the resize calls on
+   * the pool. When increasing the size, we set this to a positive number and start new sessions
+   * on background threads, gradually bringing it back to 0.
+   * When decreasing the size, we try to kill as many existing sessions as we can; if that is
+   * not enough because the sessions are in use or being restarted, we kill them as they are
+   * re-added to the pool.
+   * Repeated calls to resize adjust the delta to ensure correctness between different resizes.
+   */
+  private final AtomicInteger deltaRemaining = new AtomicInteger();
 
   private final String amRegistryName;
   private final TezAmRegistryImpl amRegistry;
 
   private final ConcurrentHashMap<String, SessionType> bySessionId =
       new ConcurrentHashMap<>();
+  // Preserved at initialization time to have a session to use during resize.
+  // TODO: rather, Tez sessions should not depend on SessionState.
+  private SessionState parentSessionState;
 
-
-  TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) {
+  TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent,
+      SessionObjectFactory<SessionType> sessionFactory) {
     this.initConf = initConf;
-    assert numSessionsTotal > 0;
-    defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal);
+    this.initialSize = numSessionsTotal;
     this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null;
     this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName();
+    this.sessionObjFactory = sessionFactory;
   }
 
-  void startInitialSessions() throws Exception {
-    initialSize = initialSessions.size();
-    if (initialSessions.isEmpty()) return;
+  void start() throws Exception {
     if (amRegistry != null) {
       amRegistry.start();
       amRegistry.initializeWithoutRegistering();
@@ -82,82 +103,145 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       amRegistry.populateCache(true);
     }
 
-    int threadCount = Math.min(initialSessions.size(),
+    int threadCount = Math.min(initialSize,
         HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
     if (threadCount == 1) {
-      while (true) {
-        SessionType session = initialSessions.poll();
+      for (int i = 0; i < initialSize; ++i) {
+        SessionType session = sessionObjFactory.create(null);
         if (session == null) break;
         startInitialSession(session);
       }
     } else {
-      final SessionState parentSessionState = SessionState.get();
-      // The runnable has no mutable state, so each thread can run the same thing.
-      final AtomicReference<Exception> firstError = new AtomicReference<>(null);
-      Runnable runnable = new Runnable() {
-        public void run() {
-          if (parentSessionState != null) {
-            SessionState.setCurrentSessionState(parentSessionState);
-          }
-          while (true) {
-            SessionType session = initialSessions.poll();
-            if (session == null) break;
-            if (firstError.get() != null) break; // Best-effort.
-            try {
-              startInitialSession(session);
-            } catch (Exception e) {
-              if (!firstError.compareAndSet(null, e)) {
-                LOG.error("Failed to start session; ignoring due to previous error", e);
-              }
-              break;
-            }
-          }
+      final AtomicInteger remaining = new AtomicInteger(initialSize);
+      this.parentSessionState = SessionState.get();
+      @SuppressWarnings("unchecked")
+      FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
+      for (int i = threadTasks.length - 1; i >= 0; --i) {
+        threadTasks[i] = new FutureTask<Boolean>(new CreateSessionsRunnable(remaining));
+        if (i == 0) {
+          // Start is blocking, so run one of the tasks on the main thread.
+          threadTasks[i].run();
+        } else {
+          new Thread(threadTasks[i], "Tez session init " + i).start();
         }
-      };
-      Thread[] threads = new Thread[threadCount - 1];
-      for (int i = 0; i < threads.length; ++i) {
-        threads[i] = new Thread(runnable, "Tez session init " + i);
-        threads[i].start();
-      }
-      runnable.run();
-      for (int i = 0; i < threads.length; ++i) {
-        threads[i].join();
       }
-      Exception ex = firstError.get();
-      if (ex != null) {
-        throw ex;
+      for (int i = 0; i < threadTasks.length; ++i) {
+        threadTasks[i].get();
       }
     }
   }
 
-  void addInitialSession(SessionType session) {
-    initialSessions.add(session);
-  }
-
   SessionType getSession() throws Exception {
     while (true) {
-      SessionType result = defaultQueuePool.take();
-      if (result.tryUse()) return result;
+      SessionType result = null;
+      poolLock.lock();
+      try {
+        while ((result = pool.poll()) == null) {
+          notEmpty.await(100, TimeUnit.MILLISECONDS);
+        }
+      } finally {
+        poolLock.unlock();
+      }
+      if (result.tryUse(false)) return result;
       LOG.info("Couldn't use a session [" + result + "]; attempting another one");
     }
   }
 
-  void returnSession(SessionType session) throws Exception {
+  ListenableFuture<SessionType> getSessionAsync() throws Exception {
+    SettableFuture<SessionType> future = SettableFuture.create();
+    poolLock.lock();
+    try {
+      // Try to get the session quickly.
+      while (true) {
+        SessionType result = pool.poll();
+        if (result == null) break;
+        if (!result.tryUse(false)) continue;
+        future.set(result);
+        return future;
+      }
+      // The pool is empty; queue the request.
+      asyncRequests.add(future);
+      return future;
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  void returnSession(SessionType session) {
+    returnSessionInternal(session, false);
+  }
+
+  boolean returnSessionAsync(SessionType session) {
+    return returnSessionInternal(session, true);
+  }
+
+  private boolean returnSessionInternal(SessionType session, boolean isAsync) {
     // Make sure that if the session is returned to the pool, it doesn't live in the global.
     SessionState sessionState = SessionState.get();
     if (sessionState != null) {
       sessionState.setTezSession(null);
     }
-    if (session.stopUsing()) {
-      defaultQueuePool.putFirst(session);
+    if (!session.stopUsing()) return true; // The session will be restarted and return to us.
+    boolean canPutBack = putSessionBack(session, true);
+    if (canPutBack) return true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Closing an unneeded returned session " + session);
+    }
+
+    if (isAsync) return false; // The caller is responsible for destroying the session.
+    try {
+      session.close(false);
+    } catch (Exception ex) {
+      LOG.error("Failed to close " + session, ex);
+    }
+    return true;
+  }
+
+  /**
+   * Puts session back into the pool.
+   * @return true if the session has been put back; false if it's not needed and should be killed.
+   */
+  private boolean putSessionBack(SessionType session, boolean isFirst) {
+    SettableFuture<SessionType> future = null;
+    poolLock.lock();
+    try {
+      // See if we need to kill some sessions because the pool was resized down while
+      // a bunch of sessions were outstanding. See also deltaRemaining javadoc.
+      while (true) {
+        int remainingToKill = -deltaRemaining.get();
+        if (remainingToKill <= 0) break; // No need to kill anything.
+        if (deltaRemaining.compareAndSet(-remainingToKill, -remainingToKill + 1)) {
+          return false;
+        }
+      }
+      // If there are async requests, satisfy them first.
+      if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+        future = asyncRequests.poll();
+      }
+      if (future == null) {
+        // Put session into the pool.
+        if (isFirst) {
+          pool.addFirst(session);
+        } else {
+          pool.addLast(session);
+        }
+        notEmpty.signalAll();
+      }
+    } finally {
+      poolLock.unlock();
     }
+    if (future != null) {
+      future.set(session);
+    }
+    return true;
   }
 
-  void replaceSession(SessionType oldSession, SessionType newSession,
-      boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws Exception {
+  void replaceSession(SessionType oldSession, boolean keepTmpDir,
+      String[] additionalFilesArray) throws Exception {
     // Retain the stuff from the old session.
     // Re-setting the queue config is an old hack that we may remove in future.
+    SessionType newSession = sessionObjFactory.create(oldSession);
     Path scratchDir = oldSession.getTezScratchDir();
     String queueName = oldSession.getQueueName();
     Set<String> additionalFiles = null;
@@ -171,12 +255,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     }
     try {
       oldSession.close(keepTmpDir);
-      boolean wasRemoved = defaultQueuePool.remove(oldSession);
-      if (!wasRemoved) {
-        LOG.error("Old session was closed but it was not in the pool", oldSession);
+    } finally {
+      poolLock.lock();
+      try {
+        // The expiring session may or may not be in the pool.
+        pool.remove(oldSession);
+      } finally {
+        poolLock.unlock();
       }
+
       bySessionId.remove(oldSession.getSessionId());
-    } finally {
       // There's some bogus code that can modify the queue name. Force-set it for pool sessions.
       // TODO: this might only be applicable to TezSessionPoolManager; try moving it there?
       newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
@@ -184,18 +272,36 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       // registry again just in case. TODO: maybe we should enforce that.
       configureAmRegistry(newSession);
       newSession.open(additionalFiles, scratchDir);
-      defaultQueuePool.put(newSession);
+      if (!putSessionBack(newSession, false)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing an unneeded session " + newSession
+              + "; trying to replace " + oldSession);
+        }
+        try {
+          newSession.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an unneeded session", ex);
+        }
+      }
     }
   }
 
+
   private void startInitialSession(SessionType session) throws Exception {
-    boolean isUsable = session.tryUse();
+    boolean isUsable = session.tryUse(true);
     if (!isUsable) throw new IOException(session + " is not usable at pool startup");
     session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName());
     configureAmRegistry(session);
     session.open();
     if (session.stopUsing()) {
-      defaultQueuePool.put(session);
+      if (!putSessionBack(session, false)) {
+        LOG.warn("Couldn't add a session during initialization");
+        try {
+          session.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an unneeded session", ex);
+        }
+      }
     }
   }
 
@@ -230,7 +336,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     @Override
     public void onUpdate(TezAmInstance serviceInstance) {
       // Presumably we'd get those later if AM updates its stuff.
-      LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+      LOG.info("Received an unexpected update for instance={}. Ignoring", serviceInstance);
     }
 
     @Override
@@ -239,11 +345,126 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       // For now, we don't take any action. In future, we might restore the session based
       // on this and get rid of the logic outside of the pool that replaces/reopens/etc.
       LOG.warn("AM for " + sessionId + " has disappeared from the registry");
+      // TODO: this might race if AM for the same session is restarted internally by Tez.
+      //        It is possible to receive the create before remove and remove the wrong one.
+      //        We need some identity in the value to make sure that doesn't happen.
       bySessionId.remove(sessionId);
     }
   }
 
+  @VisibleForTesting
   int getInitialSize() {
     return initialSize;
   }
+
+  /**
+   * Resizes the pool asynchronously.
+   * @param delta A number of threads to add or remove.
+   * @param toClose An output list to which newly-unneeded sessions, to be closed by the caller.
+   */
+  public ListenableFuture<?> resizeAsync(int delta, List<SessionType> toClose) {
+    if (delta == 0) return createDummyFuture();
+    poolLock.lock();
+    try {
+      if (delta < 0) {
+        return resizeDownInternal(-delta, toClose);
+      } else {
+        return resizeUpInternal(delta);
+      }
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  private ListenableFuture<?> resizeUpInternal(int delta) {
+    // 1) Cancel the kills if any, to avoid killing the returned sessions.
+    //    Also sets the count for the async initialization.
+    int oldVal;
+    do {
+      oldVal = deltaRemaining.get();
+    } while (!deltaRemaining.compareAndSet(oldVal, oldVal + delta));
+    int toStart = oldVal + delta;
+    if (toStart <= 0) return createDummyFuture();
+    LOG.info("Resizing the pool; adding " + toStart + " sessions");
+
+    // 2) If we need to create some extra sessions, we'd do it just like startup does.
+    int threadCount = Math.max(1, Math.min(toStart,
+        HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)));
+    List<ListenableFutureTask<Boolean>> threadTasks = new ArrayList<>(threadCount);
+    // This is an async method, so always launch threads, even for a single task.
+    for (int i = 0; i < threadCount; ++i) {
+      ListenableFutureTask<Boolean> task = ListenableFutureTask.create(
+          new CreateSessionsRunnable(deltaRemaining));
+      new Thread(task, "Tez pool resize " + i).start();
+      threadTasks.add(task);
+    }
+    return Futures.allAsList(threadTasks);
+  }
+
+  private ListenableFuture<Boolean> resizeDownInternal(int delta, List<SessionType> toClose) {
+    // 1) Cancel the previous expansion, if any.
+    while (true) {
+      int expansionCount = deltaRemaining.get();
+      if (expansionCount <= 0) break;
+      int expansionCancelled = Math.min(expansionCount, delta);
+      if (deltaRemaining.compareAndSet(expansionCount, expansionCount - expansionCancelled)) {
+        delta -= expansionCancelled;
+        break;
+      }
+    }
+    // 2) Drain unused sessions; the close() is sync so delegate to the caller.
+    while (delta > 0) {
+      SessionType session = pool.poll();
+      if (session == null) break;
+      if (!session.tryUse(true)) continue;
+      toClose.add(session);
+      --delta;
+    }
+    // 3) If too many sessions are outstanding (e.g. due to expiration restarts - should
+    //    not happen with in-use sessions because WM already kills the extras), we will kill
+    //    them as they come back from restarts.
+    if (delta > 0) {
+      int oldVal;
+      do {
+        oldVal = deltaRemaining.get();
+      } while (!deltaRemaining.compareAndSet(oldVal, oldVal - delta));
+    }
+    return createDummyFuture();
+  }
+
+  private ListenableFuture<Boolean> createDummyFuture() {
+    SettableFuture<Boolean> f = SettableFuture.create();
+    f.set(true);
+    return f;
+  }
+
+  private final class CreateSessionsRunnable implements Callable<Boolean> {
+    private final AtomicInteger remaining;
+
+    private CreateSessionsRunnable(AtomicInteger remaining) {
+      this.remaining = remaining;
+    }
+
+    public Boolean call() throws Exception {
+      if (parentSessionState != null) {
+        SessionState.setCurrentSessionState(parentSessionState);
+      }
+      while (true) {
+        int oldVal = remaining.get();
+        if (oldVal <= 0) return true;
+        if (!remaining.compareAndSet(oldVal, oldVal - 1)) continue;
+        startInitialSession(sessionObjFactory.create(null));
+      }
+    }
+  }
+
+  @VisibleForTesting
+  int getCurrentSize() {
+    poolLock.tryLock();
+    try {
+      return pool.size();
+    } finally {
+      poolLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 9b4714f..15543d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import javax.security.auth.login.LoginException;
+import org.apache.tez.dag.api.TezException;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -102,7 +107,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   public void startPool() throws Exception {
     if (defaultSessionPool != null) {
-      defaultSessionPool.startInitialSessions();
+      defaultSessionPool.start();
     }
     if (expirationTracker != null) {
       expirationTracker.start();
@@ -110,7 +115,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   }
 
   public void setupPool(HiveConf conf) throws Exception {
-    String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+    final String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
         conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
     this.initConf = conf;
     int emptyNames = 0; // We don't create sessions for empty entries.
@@ -123,7 +128,37 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
     if (numSessionsTotal > 0) {
       boolean enableAmRegistry = false;
-      defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry);
+      defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry,
+          new TezSessionPool.SessionObjectFactory<TezSessionPoolSession>() {
+            int queueIx = 0;
+
+            @Override
+            public TezSessionPoolSession create(TezSessionPoolSession oldSession) {
+              if (oldSession != null) {
+                return createAndInitSession(
+                    oldSession.getQueueName(), oldSession.isDefault(), oldSession.getConf());
+              }
+              // We never resize the pool, so assume this is initialization.
+              // If that changes, we might have to make the factory interface more complicated.
+              /*
+               * In a single-threaded init case, with this the ordering of sessions in the queue will be
+               * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+               * distribution of the sessions across queues at least to begin with. Then as sessions get
+               * freed up, the list may change this ordering.
+               * In a multi threaded init case it's a free for all.
+               */
+              int localQueueIx;
+              synchronized (defaultQueueList) {
+                localQueueIx = queueIx;
+                ++queueIx;
+                if (queueIx == defaultQueueList.length) {
+                  queueIx = 0;
+                }
+              }
+              HiveConf sessionConf = new HiveConf(initConf);
+              return createAndInitSession(defaultQueueList[localQueueIx], true, sessionConf);
+          }
+      });
     }
 
     numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -155,24 +190,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     if (!hasInitialSessions) {
       return;
     }
-
-
-    /*
-     * In a single-threaded init case, with this the ordering of sessions in the queue will be
-     * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
-     * distribution of the sessions across queues at least to begin with. Then as sessions get
-     * freed up, the list may change this ordering.
-     * In a multi threaded init case it's a free for all.
-     */
-    for (int i = 0; i < numSessions; i++) {
-      for (String queueName : defaultQueueList) {
-        if (queueName.isEmpty()) {
-          continue;
-        }
-        HiveConf sessionConf = new HiveConf(initConf);
-        defaultSessionPool.addInitialSession(createAndInitSession(queueName, true, sessionConf));
-      }
-    }
   }
 
   // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
@@ -438,7 +455,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
         && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
       sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
     }
+    reopenInternal(sessionState, additionalFiles);
+    return sessionState;
+  }
+
+  static void reopenInternal(
+      TezSessionState sessionState, String[] additionalFiles) throws Exception {
     Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
+    // TODO: implies the session files and the array are the same if not null; why? very brittle
     if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
         && (additionalFiles != null)) {
       oldAdditionalFiles = new HashSet<>();
@@ -449,11 +473,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     // TODO: close basically resets the object to a bunch of nulls.
     //       We should ideally not reuse the object because it's pointless and error-prone.
     sessionState.close(true);
-    // TODO: should we reuse scratchDir too?
+    // Note: scratchdir is reused implicitly because the sessionId is the same.
     sessionState.open(oldAdditionalFiles, null);
-    return sessionState;
   }
 
+
   public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
     List<TezSessionPoolSession> sessionsToClose = null;
     synchronized (openSessions) {
@@ -467,14 +491,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   /** Closes a running (expired) pool session and reopens it. */
   @Override
-  public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
+  public void closeAndReopenExpiredSession(TezSessionPoolSession oldSession) throws Exception {
     String queueName = oldSession.getQueueName();
     if (queueName == null) {
       LOG.warn("Pool session has a null queue: " + oldSession);
     }
     TezSessionPoolSession newSession = createAndInitSession(
       queueName, oldSession.isDefault(), oldSession.getConf());
-    defaultSessionPool.replaceSession(oldSession, newSession, false, null, null);
+    defaultSessionPool.replaceSession(oldSession, false, null);
   }
 
   /** Called by TezSessionPoolSession when opened. */

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6135223..6887d7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+import org.apache.hadoop.conf.Configuration;
+
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -139,28 +145,28 @@ class TezSessionPoolSession extends TezSessionState {
    * Tries to use this session. When the session is in use, it will not expire.
    * @return true if the session can be used; false if it has already expired.
    */
-  public boolean tryUse() throws Exception {
+  public boolean tryUse(boolean ignoreExpiration) {
     while (true) {
       int oldValue = sessionState.get();
       if (oldValue == STATE_IN_USE) throw new AssertionError(this + " is already in use");
       if (oldValue == STATE_EXPIRED) return false;
-      int finalState = shouldExpire() ? STATE_EXPIRED : STATE_IN_USE;
+      int finalState = (!ignoreExpiration && shouldExpire()) ? STATE_EXPIRED : STATE_IN_USE;
       if (sessionState.compareAndSet(STATE_NONE, finalState)) {
         if (finalState == STATE_IN_USE) return true;
         // Restart asynchronously, don't block the caller.
-        expirationTracker.closeAndRestartExpiredSession(this, true);
+        expirationTracker.closeAndRestartExpiredSessionAsync(this);
         return false;
       }
     }
   }
 
-  boolean stopUsing() throws Exception {
+  boolean stopUsing() {
     int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE;
     if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
       throw new AssertionError("Unexpected state change; currently " + sessionState.get());
     }
     if (finalState == STATE_NONE) return true;
-    expirationTracker.closeAndRestartExpiredSession(this, true);
+    expirationTracker.closeAndRestartExpiredSessionAsync(this);
     return false;
   }
 
@@ -176,13 +182,17 @@ class TezSessionPoolSession extends TezSessionState {
     while (true) {
       if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
       if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
-        expirationTracker.closeAndRestartExpiredSession(this, isAsync);
+        if (isAsync) {
+          expirationTracker.closeAndRestartExpiredSessionAsync(this);
+        } else {
+          expirationTracker.closeAndRestartExpiredSession(this);
+        }
         return true;
       }
     }
   }
 
-  private boolean shouldExpire() {
+  private final boolean shouldExpire() {
     return expirationNs != null && (System.nanoTime() - expirationNs) >= 0;
   }
 
@@ -209,4 +219,4 @@ class TezSessionPoolSession extends TezSessionState {
   void updateFromRegistry(TezAmInstance si) {
     // Nothing to do.
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1448168..541ee73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -629,7 +629,7 @@ public class TezSessionState {
     }
   }
 
-  public void cleanupScratchDir () throws IOException {
+  protected final void cleanupScratchDir () throws IOException {
     FileSystem fs = tezScratchDir.getFileSystem(conf);
     fs.delete(tezScratchDir, true);
     tezScratchDir = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 81d6b85..96dc7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -41,13 +41,12 @@ class UserPoolMapping {
   private final String defaultPoolName;
   // TODO: add other types as needed
 
-  public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) {
+  public UserPoolMapping(List<TmpUserMapping> mappings) {
     String defaultPoolName = null;
     for (TmpUserMapping mapping : mappings) {
       switch (mapping.getType()) {
       case USER: {
-          String poolName = getValidPoolName(poolNames, mapping);
-          Mapping val = new Mapping(poolName, mapping.getPriority());
+          Mapping val = new Mapping(mapping.getPoolName(), mapping.getPriority());
           Mapping oldValue = userMappings.put(mapping.getName(), val);
           if (oldValue != null) {
             throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; "
@@ -56,7 +55,7 @@ class UserPoolMapping {
         break;
       }
       case DEFAULT: {
-        String poolName = getValidPoolName(poolNames, mapping);
+        String poolName = mapping.getPoolName();
         if (defaultPoolName != null) {
           throw new AssertionError("Duplicate default mapping; "
               + defaultPoolName + " and " + poolName);
@@ -80,13 +79,4 @@ class UserPoolMapping {
     if (userMapping != null) return userMapping.fullPoolName;
     return defaultPoolName;
   }
-
-  private static String getValidPoolName(Set<String> poolNames, TmpUserMapping mapping) {
-    String poolName = mapping.getPoolName();
-    // Should we really be validating here? The plan should be validated before applying.
-    if (!poolNames.contains(mapping.getPoolName())) {
-      throw new AssertionError("Invalid pool in the mapping " + poolName);
-    }
-    return poolName;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 00501ee..0dd1433 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -18,18 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.concurrent.TimeoutException;
+import com.google.common.util.concurrent.ListenableFuture;
 
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 
 public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
   private String poolName;
   private double clusterFraction;
+  private String killReason = null;
 
   private final Object amPluginInfoLock = new Object();
   private AmPluginInfo amPluginInfo = null;
+  private SettableFuture<WmTezSession> amRegistryFuture = null;
+  private ScheduledFuture<?> timeoutTimer = null;
 
+  private final WorkloadManager wmParent;
 
   /** The actual state of the guaranteed task, and the update state, for the session. */
   // Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks.
@@ -41,34 +53,60 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   }
   private final ActualWmState actualState = new ActualWmState();
 
-  public WmTezSession(String sessionId, Manager parent,
+  public WmTezSession(String sessionId, WorkloadManager parent,
       SessionExpirationTracker expiration, HiveConf conf) {
     super(sessionId, parent, expiration, conf);
+    wmParent = parent;
   }
 
-  @Override
-  public AmPluginInfo waitForAmPluginInfo(int timeoutMs)
-      throws InterruptedException, TimeoutException {
+  @VisibleForTesting
+  WmTezSession(String sessionId, Manager testParent,
+      SessionExpirationTracker expiration, HiveConf conf) {
+    super(sessionId, testParent, expiration, conf);
+    wmParent = null;
+  }
+
+  public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
+      int timeoutMs, ScheduledExecutorService timeoutPool) {
+    SettableFuture<WmTezSession> future = SettableFuture.create();
     synchronized (amPluginInfoLock) {
-      if (amPluginInfo == null) {
-        amPluginInfoLock.wait(timeoutMs);
-        if (amPluginInfo == null) {
-          throw new TimeoutException("No plugin information for " + getSessionId());
-        }
+      if (amPluginInfo != null) {
+        future.set(this);
+        return future;
       }
-      return amPluginInfo;
+      if (amRegistryFuture != null) {
+        // We don't need this for now, so do not support it.
+        future.setException(new RuntimeException("Multiple waits are not suported"));
+        return future;
+      }
+      amRegistryFuture = future;
+      if (timeoutMs <= 0) return future;
+      // TODO: replace with withTimeout after we get the relevant guava upgrade.
+      this.timeoutTimer = timeoutPool.schedule(
+          new TimeoutRunnable(), timeoutMs, TimeUnit.MILLISECONDS);
     }
+    return future;
   }
 
+
   @Override
   void updateFromRegistry(TezAmInstance si) {
+    AmPluginInfo info = new AmPluginInfo(si.getHost(), si.getPluginPort(),
+        si.getPluginToken(), si.getPluginTokenJobId());
     synchronized (amPluginInfoLock) {
-      this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(),
-          si.getPluginToken(), si.getPluginTokenJobId());
-      amPluginInfoLock.notifyAll();
+      this.amPluginInfo = info;
+      if (amRegistryFuture != null) {
+        amRegistryFuture.set(this);
+        amRegistryFuture = null;
+      }
+      if (timeoutTimer != null) {
+        timeoutTimer.cancel(true);
+        timeoutTimer = null;
+      }
     }
   }
 
+  @Override
   public AmPluginInfo getAmPluginInfo() {
     return amPluginInfo; // Only has final fields, no artifacts from the absence of sync.
   }
@@ -85,6 +123,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     this.clusterFraction = fraction;
   }
 
+  void clearWm() {
+    this.poolName = null;
+    this.clusterFraction = 0f;
+  }
+
   double getClusterFraction() {
     return this.clusterFraction;
   }
@@ -118,4 +161,42 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       return (actualState.sent == actualState.target);
     }
   }
+
+  public void handleUpdateError() {
+    wmParent.addUpdateError(this);
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this;
+  }
+
+  boolean isIrrelevantForWm() {
+    return killReason != null;
+  }
+
+  String getReasonForKill() {
+    return killReason;
+  }
+
+  void setIsIrrelevantForWm(String killReason) {
+    this.killReason = killReason;
+  }
+
+  private final class TimeoutRunnable implements Runnable {
+    @Override
+    public void run() {
+      synchronized (amPluginInfoLock) {
+        timeoutTimer = null;
+        if (amRegistryFuture == null || amRegistryFuture.isDone()) return;
+        amRegistryFuture.cancel(true);
+        amRegistryFuture = null;
+      }
+    }
+  }
 }