You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/02 18:40:51 UTC

[2/3] hive git commit: HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 35e5710..b0c6d58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,20 +17,34 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Semaphore;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -38,12 +52,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +59,14 @@ import org.slf4j.LoggerFactory;
 
 /** Workload management entry point for HS2. */
 public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
-  implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
+    implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
   // TODO: this is a temporary setting that will go away, so it's not in HiveConf.
   public static final String TEST_WM_CONFIG = "hive.test.workload.management";
+  private static final char POOL_SEPARATOR = '/';
 
   private final HiveConf conf;
-  private final TezSessionPool<WmTezSession> sessions;
+  private final TezSessionPool<WmTezSession> tezAmPool;
   private final SessionExpirationTracker expirationTracker;
   private final RestrictedConfigChecker restrictedConfig;
   private final QueryAllocationManager allocationManager;
@@ -69,56 +78,50 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       new IdentityHashMap<>();
   private final int amRegistryTimeoutMs;
 
-
-  /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */
-  private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock();
-  private final Map<String, PoolState> pools = new HashMap<>();
+  // Note: pools can only be modified by the master thread.
+  private HashMap<String, PoolState> pools;
   // Used to make sure that waiting getSessions don't block update.
-  private int internalPoolsVersion;
   private UserPoolMapping userPoolMapping;
+  private int totalQueryParallelism;
+  // We index the get requests to make sure there are no ordering artifacts when we requeue.
+  private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
 
   private SessionTriggerProvider sessionTriggerProvider;
   private TriggerActionHandler triggerActionHandler;
   private TriggerValidatorRunnable triggerValidatorRunnable;
 
-  public static class PoolState {
-    // Add stuff here as WM is implemented.
-    private final Object lock = new Object();
-    private final List<WmTezSession> sessions = new ArrayList<>();
-    private final Semaphore sessionsClaimed;
+  // Note: we could use RW lock to allow concurrent calls for different sessions, however all
+  //       those calls do is add elements to lists and maps; and we'd need to sync those separately
+  //       separately, plus have an object to notify because RW lock does not support conditions
+  //       in any sensible way. So, for now the lock is going to be epic.
+  private final ReentrantLock currentLock = new ReentrantLock();
+  private final Condition hasChangesCondition = currentLock.newCondition();
+  // The processing thread will switch between these two objects.
+  private final EventState one = new EventState(), two = new EventState();
+  private boolean hasChanges = false;
+  private EventState current = one;
 
-    private final String fullName;
-    private final double finalFraction;
-    private double finalFractionRemaining;
-    private final int queryParallelism;
-    private List<Trigger> triggers = new ArrayList<>();
-
-    public PoolState(String fullName, int queryParallelism, double fraction) {
-      this.fullName = fullName;
-      this.queryParallelism = queryParallelism;
-      // A fair semaphore to ensure correct queue order.
-      this.sessionsClaimed = new Semaphore(queryParallelism, true);
-      this.finalFraction = this.finalFractionRemaining = fraction;
-    }
+  /** The master thread the processes the events from EventState. */
+  @VisibleForTesting
+  protected final Thread wmThread;
+  /** Used by the master thread to offload calls blocking on smth other than fast locks. */
+  private final ExecutorService workPool;
+  /** Used to schedule timeouts for some async operations. */
+  private final ScheduledExecutorService timeoutPool;
+  private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
 
+  @SuppressWarnings("rawtypes")
+  private final FutureCallback FATAL_ERROR_CALLBACK = new FutureCallback() {
     @Override
-    public String toString() {
-      return "[" + fullName + ", query parallelism " + queryParallelism
-          + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
-          + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
-          + "]";
-    }
-
-    @VisibleForTesting
-    // will change in HIVE-17809
-    public void setTriggers(final List<Trigger> triggers) {
-      this.triggers = triggers;
+    public void onSuccess(Object result) {
     }
 
-    public List<Trigger> getTriggers() {
-      return triggers;
+    @Override
+    public void onFailure(Throwable t) {
+      // TODO: shut down HS2?
+      LOG.error("Workload management fatal error", t);
     }
-  }
+  };
 
   // TODO: this is temporary before HiveServerEnvironment is merged.
   private static volatile WorkloadManager INSTANCE;
@@ -135,71 +138,86 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   /** Called once, when HS2 initializes. */
   public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourcePlan plan) {
     assert INSTANCE == null;
-    Token<JobTokenIdentifier> amsToken = createAmsToken();
     // We could derive the expected number of AMs to pass in.
-    LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1);
+    LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, null, -1);
     QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm);
-    return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, amsToken, plan));
-  }
-
-  private static Token<JobTokenIdentifier> createAmsToken() {
-    if (!UserGroupInformation.isSecurityEnabled()) return null;
-    // This application ID is completely bogus.
-    ApplicationId id = ApplicationId.newInstance(
-        System.nanoTime(), (int)(System.nanoTime() % 100000));
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
-    JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
-    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jobTokenManager);
-    sessionToken.setService(identifier.getJobId());
-    return sessionToken;
+    return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, plan));
   }
 
   @VisibleForTesting
   WorkloadManager(String yarnQueue, HiveConf conf,
-      QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken, TmpResourcePlan plan) {
+      QueryAllocationManager qam, TmpResourcePlan plan) {
     this.yarnQueue = yarnQueue;
     this.conf = conf;
-    int numSessions = initializeHivePools(plan);
-    LOG.info("Initializing with " + numSessions + " total query parallelism");
+    this.totalQueryParallelism = applyInitialResourcePlan(plan);
+    LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism");
 
     this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
         conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS);
-    sessions = new TezSessionPool<>(conf, numSessions, true);
+    tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true,
+        new TezSessionPool.SessionObjectFactory<WmTezSession>() {
+          @Override
+          public WmTezSession create(WmTezSession oldSession) {
+            return createSession(oldSession == null ? null : oldSession.getConf());
+          }
+    });
     restrictedConfig = new RestrictedConfigChecker(conf);
     allocationManager = qam;
     // Only creates the expiration tracker if expiration is configured.
     expirationTracker = SessionExpirationTracker.create(conf, this);
-    for (int i = 0; i < numSessions; i++) {
-      sessions.addInitialSession(createSession());
-    }
+    ThreadFactory workerFactory = new ThreadFactory() {
+      private final AtomicInteger threadNumber = new AtomicInteger(-1);
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, "Workload management worker " + threadNumber.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+      }
+    };
+    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf,
+        ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), workerFactory);
+    ThreadFactory timeoutFactory = new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, "Workload management timeout thread");
+        t.setDaemon(true);
+        return t;
+      }
+    };
+    timeoutPool = Executors.newScheduledThreadPool(1, timeoutFactory);
+
+    wmThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        runWmThread();
+      }
+    }, "Workload management master");
+    wmThread.setDaemon(true);
     // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers)
     sessionTriggerProvider = new SessionTriggerProvider();
     triggerActionHandler = new TriggerViolationActionHandler();
-    triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler());
+    triggerValidatorRunnable = new TriggerValidatorRunnable(
+        getSessionTriggerProvider(), getTriggerActionHandler());
     startTriggerValidator(conf);
   }
 
-  private int initializeHivePools(TmpResourcePlan plan) {
-    poolsLock.writeLock().lock();
-    try {
-      // FIXME: Add Triggers from metastore to poolstate
-      // Note: we assume here that plan has been validated beforehand, so we don't verify
-      //       that fractions or query parallelism add up.
-      int totalQueryParallelism = 0;
-      // Use recursion to update parents more conveniently; we don't expect a big tree.
-      for (TmpHivePool pool : plan.getRootPools()) {
-        totalQueryParallelism += addHivePool(pool, null);
-      }
-      this.userPoolMapping = new UserPoolMapping(plan.getMappings(), pools.keySet());
-      internalPoolsVersion = 0; // Initializing for the first time.
-      return totalQueryParallelism;
-    } finally {
-      poolsLock.writeLock().unlock();
+  // TODO: remove and let the thread handle it via normal ways?
+  private int applyInitialResourcePlan(TmpResourcePlan plan) {
+    int totalQueryParallelism = 0;
+    // Note: we assume here that plan has been validated beforehand, so we don't verify
+    //       that fractions or query parallelism add up.
+    this.userPoolMapping = new UserPoolMapping(plan.getMappings());
+    assert pools == null;
+    pools = new HashMap<>();
+    // Use recursion to update parents more conveniently; we don't expect a big tree.
+    for (TmpHivePool pool : plan.getRootPools()) {
+      totalQueryParallelism += addInitialHivePool(pool, null);
     }
+    return totalQueryParallelism;
   }
 
-  private final static char POOL_SEPARATOR = '/';
-  private int addHivePool(TmpHivePool pool, PoolState parent) {
+  // TODO: remove and let the thread handle it via normal ways?
+  private int addInitialHivePool(TmpHivePool pool, PoolState parent) {
     String fullName = pool.getName();
     int totalQueryParallelism = pool.getQueryParallelism();
     double fraction = pool.getResourceFraction();
@@ -211,178 +229,804 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     PoolState state = new PoolState(fullName, totalQueryParallelism, fraction);
     if (pool.getChildren() != null) {
       for (TmpHivePool child : pool.getChildren()) {
-        totalQueryParallelism += addHivePool(child, state);
+        totalQueryParallelism += addInitialHivePool(child, state);
       }
     }
-    LOG.info("Adding Hive pool: " + state);
+    state.setTriggers(pool.triggers);
+    LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
     pools.put(fullName, state);
     return totalQueryParallelism;
   }
 
-  public TezSessionState getSession(
-      TezSessionState session, String userName, HiveConf conf) throws Exception {
-    validateConfig(conf);
-    WmTezSession result = checkSessionForReuse(session);
-    boolean hasAcquired = false;
-    String poolName = null;
-    while (!hasAcquired) { // This loop handles concurrent plan updates while we are waiting.
-      poolName = userPoolMapping.mapSessionToPoolName(userName);
-      if (poolName == null) {
-        throw new HiveException("Cannot find any pool mapping for user " + userName);
-      }
-      int internalVersion = -1;
-      Semaphore sessionsClaimed = null;
-      poolsLock.readLock().lock();
+  public void start() throws Exception {
+    tezAmPool.start();
+    if (expirationTracker != null) {
+      expirationTracker.start();
+    }
+    allocationManager.start();
+    wmThread.start();
+  }
+
+  public void stop() throws Exception {
+    List<TezSessionPoolSession> sessionsToClose = null;
+    synchronized (openSessions) {
+      sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions.keySet());
+    }
+    for (TezSessionState sessionState : sessionsToClose) {
+      sessionState.close(false);
+    }
+    if (expirationTracker != null) {
+      expirationTracker.stop();
+    }
+    allocationManager.stop();
+    if (wmThread != null) {
+      wmThread.interrupt();
+    }
+    workPool.shutdownNow();
+    timeoutPool.shutdownNow();
+
+    INSTANCE = null;
+  }
+
+  /** Represent a single iteration of work for the master thread. */
+  private final static class EventState {
+    private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet(),
+        toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+    private final LinkedList<SessionInitContext> initResults = new LinkedList<>();
+    private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen =
+        new IdentityHashMap<>();
+    private final LinkedList<GetRequest> getRequests = new LinkedList<>();
+    private final IdentityHashMap<WmTezSession, GetRequest> toReuse = new IdentityHashMap<>();
+    private TmpResourcePlan resourcePlanToApply = null;
+    private boolean hasClusterStateChanged = false;
+    private SettableFuture<Boolean> testEvent, applyRpFuture;
+  }
+
+  /**
+   * The work delegated from the master thread that doesn't have an async implementation
+   * (mostly opening and closing the sessions).
+   */
+  private final static class WmThreadSyncWork {
+    private LinkedList<WmTezSession> toRestartInUse = new LinkedList<>(),
+        toDestroyNoRestart = new LinkedList<>();
+  }
+
+  private void runWmThread() {
+    while (true) {
+      EventState currentEvents = null;
+      currentLock.lock();
       try {
-        PoolState pool = pools.get(poolName);
-        if (pool == null) throw new AssertionError("Pool " + poolName + " not found.");
-        // No need to take the pool lock, semaphore is final.
-        sessionsClaimed = pool.sessionsClaimed;
-        internalVersion = internalPoolsVersion;
-      } finally {
-        poolsLock.readLock().unlock();
-      }
-      // One cannot simply reuse the session if there are other queries waiting; to maintain
-      // fairness, we'll try to take the semaphore instantly, and if that fails we'll return
-      // this session back to the pool and potentially give the user a new session later.
-      if (result != null) {
-        // Handle the special case; the pool may be exactly at capacity w/o queue. In that
-        // case, we still should be able to reuse.
-        boolean isFromTheSamePool = false;
-        String oldPoolName = result.getPoolName();
-        if (poolName.equals(oldPoolName)) {
-          sessionsClaimed.release();
-          isFromTheSamePool = true;
-        }
-        // Note: we call timed acquire because untimed one ignores fairness.
-        hasAcquired = sessionsClaimed.tryAcquire(1, TimeUnit.MILLISECONDS);
-        if (hasAcquired) {
-          poolsLock.readLock().lock();
-          boolean doUnlock = true;
+        while (!hasChanges) {
           try {
-            if (internalVersion == internalPoolsVersion) {
-              if (!isFromTheSamePool) {
-                // Free up the usage in the old pool. TODO: ideally not under lock; not critical.
-                redistributePoolAllocations(oldPoolName, null, result, true);
-              }
-              doUnlock = false; // Do not unlock; see below.
-              break;
-            }
-          } finally {
-            if (doUnlock) {
-              poolsLock.readLock().unlock();
-            }
+            hasChangesCondition.await(1, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            LOG.warn("WM thread was interrupted and will now exit");
+            return;
           }
-          hasAcquired = false;
         }
-        // Note: we are short-circuiting session::returnToSessionManager to supply the flag
-        returnAfterUse(result, !isFromTheSamePool);
-        result = null;
+        hasChanges = false;
+        currentEvents = current;
+        current = (currentEvents == one) ? two : one;
+      } finally {
+        currentLock.unlock();
       }
-      // We don't expect frequent updates, so check every second.
-      while (!(hasAcquired = (hasAcquired || sessionsClaimed.tryAcquire(1, TimeUnit.SECONDS)))) {
-        poolsLock.readLock().lock();
-        try {
-          if (internalVersion != internalPoolsVersion) break;
-        } finally {
-          poolsLock.readLock().unlock();
+      try {
+        LOG.debug("Processing current events");
+        processCurrentEvents(currentEvents, syncWork);
+        scheduleWork(syncWork);
+      } catch (InterruptedException ex) {
+        LOG.warn("WM thread was interrupted and will now exit");
+        return;
+      } catch (Exception | AssertionError ex) {
+        LOG.error("WM thread encountered an error but will attempt to continue", ex);
+        if (currentEvents.testEvent != null) {
+          currentEvents.testEvent.setException(ex);
+          currentEvents.testEvent = null;
         }
+        if (currentEvents.applyRpFuture != null) {
+          currentEvents.applyRpFuture.setException(ex);
+          currentEvents.applyRpFuture = null;
+        }
+        // TODO: we either have to kill HS2 or, as the non-actor model would implicitly,
+        //       hope for the best and continue on other threads. Do the latter for now.
+        continue;
       }
-      if (!hasAcquired) continue;
-      // Keep it simple for now - everything between acquiring the semaphore and adding the session
-      // to the pool state is done under read lock, blocking pool updates. It's possible to make
-      // it more granular if needed. The only potentially lengthy operation is waiting for an
-      // expired session to be restarted in the session pool.
-      poolsLock.readLock().lock();
-      if (internalVersion == internalPoolsVersion) break;
-      poolsLock.readLock().unlock();
-      hasAcquired = false;
-    }
-    // We are holding the lock from the end of the loop.
-    try {
-      assert hasAcquired;
-      while (true) {
-        // TODO: ideally, we'd need to implement tryGet and deal with the valid wait from a session
-        //       restarting somehow, as opposed to the invalid case of a session missing from the
-        //       pool due to some bug. Keep a "restarting" counter in the pool?
-        boolean isFromTheSamePool = false;
-        if (result == null) {
-          result = sessions.getSession();
-        } else {
-          // If we are just reusing the session from the same pool, do not adjust allocations.
-          isFromTheSamePool = poolName.equals(result.getPoolName());
+    }
+  }
+
+  private void scheduleWork(WmThreadSyncWork context) {
+    // Do the work that cannot be done via async calls.
+
+    // 1. Restart pool sessions.
+    for (final WmTezSession toRestart : context.toRestartInUse) {
+      LOG.debug("Replacing " + toRestart + " with a new session");
+      workPool.submit(() -> {
+        try {
+          // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
+          tezAmPool.replaceSession(toRestart, false, null);
+        } catch (Exception ex) {
+          LOG.error("Failed to restart an old session; ignoring " + ex.getMessage());
         }
-        result.setQueueName(yarnQueue);
-        result.setPoolName(poolName);
-        if (!ensureAmIsRegistered(result)) continue; // Try another.
-        if (!isFromTheSamePool) {
-          redistributePoolAllocations(poolName, result, null, false);
+      });
+    }
+    context.toRestartInUse.clear();
+    // 2. Destroy the sessions that we don't need anymore.
+    for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
+      LOG.debug("Closing " + toDestroy + " without restart");
+      workPool.submit(() -> {
+        try {
+          toDestroy.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
         }
-        return result;
+      });
+    }
+    context.toDestroyNoRestart.clear();
+  }
+
+  private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
+    // The order of processing is as follows. We'd reclaim or kill all the sessions that we can
+    // reclaim from various user actions and errors, then apply the new plan if any,
+    // then give out all we can give out (restart, get and reopen callers) and rebalance the
+    // resource allocations in all the affected pools.
+    // For every session, we'd check all the concurrent things happening to it.
+
+    // TODO: also account for Tez-internal session restarts;
+    //       AM reg info changes; add notifications, ignore errors, and update alloc.
+    HashSet<String> poolsToRedistribute = new HashSet<>();
+
+    // 0. Handle initialization results.
+    for (SessionInitContext sw : e.initResults) {
+      handleInitResultOnMasterThread(sw, syncWork, poolsToRedistribute);
+    }
+    e.initResults.clear();
+
+    // 1. Handle sessions that are being destroyed by users. Destroy implies return.
+    for (WmTezSession sessionToDestroy : e.toDestroy) {
+      if (e.toReturn.remove(sessionToDestroy)) {
+        LOG.warn("The session was both destroyed and returned by the user; destroying");
       }
-    } finally {
-      poolsLock.readLock().unlock();
+      LOG.debug("Destroying {}", sessionToDestroy);
+      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToDestroy, poolsToRedistribute);
+      if (shouldReturn == null || shouldReturn) {
+        // Restart if this session is still relevant, even if there's an internal error.
+        syncWork.toRestartInUse.add(sessionToDestroy);
+      }
+    }
+    e.toDestroy.clear();
+
+    // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
+    for (WmTezSession sessionToReturn: e.toReturn) {
+      LOG.debug("Returning {}", sessionToReturn);
+      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToReturn, poolsToRedistribute);
+      if (shouldReturn == null) {
+        // Restart if there's an internal error.
+        syncWork.toRestartInUse.add(sessionToReturn);
+        continue;
+      }
+      if (!shouldReturn) continue;
+      boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
+      if (!wasReturned) {
+        syncWork.toDestroyNoRestart.add(sessionToReturn);
+      }
+    }
+    e.toReturn.clear();
+
+    // 3. Reopen is essentially just destroy + get a new session for a session in use.
+    for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
+      LOG.debug("Reopening {}", entry.getKey());
+      handeReopenRequestOnMasterThread(
+          e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork);
+    }
+    e.toReopen.clear();
+
+    // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
+    for (WmTezSession sessionWithUpdateError : e.updateErrors) {
+      LOG.debug("Update failed for {}", sessionWithUpdateError);
+      handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
+    }
+    e.updateErrors.clear();
+
+    // 5. Now apply a resource plan if any. This is expected to be pretty rare.
+    boolean hasRequeues = false;
+    if (e.resourcePlanToApply != null) {
+      LOG.debug("Applying new resource plan");
+      int getReqCount = e.getRequests.size();
+      applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
+      hasRequeues = getReqCount != e.getRequests.size();
+    }
+    e.resourcePlanToApply = null;
+
+    // 6. Handle all the get/reuse requests. We won't actually give out anything here, but merely
+    //    map all the requests and place them in an appropriate order in pool queues. The only
+    //    exception is the reuse without queue contention; can be granted immediately. If we can't
+    //    reuse the session immediately, we will convert the reuse to a normal get, because we
+    //    want query level fairness, and don't want the get in queue to hold up a session.
+    GetRequest req;
+    while ((req = e.getRequests.pollFirst()) != null) {
+      LOG.debug("Processing a new get request from " + req.userName);
+      queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
+    }
+    e.toReuse.clear();
+
+    // 7. If there was a cluster state change, make sure we redistribute all the pools.
+    if (e.hasClusterStateChanged) {
+      LOG.debug("Processing a cluster state change");
+      poolsToRedistribute.addAll(pools.keySet());
+      e.hasClusterStateChanged = false;
+    }
+
+    // 8. Finally, for all the pools that have changes, promote queued queries and rebalance.
+    for (String poolName : poolsToRedistribute) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing changes for pool " + poolName + ": " + pools.get(poolName));
+      }
+      processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
+    }
+
+    // 9. Notify tests and global async ops.
+    if (e.testEvent != null) {
+      e.testEvent.set(true);
+      e.testEvent = null;
+    }
+    if (e.applyRpFuture != null) {
+      e.applyRpFuture.set(true);
+      e.applyRpFuture = null;
     }
   }
 
-  @VisibleForTesting
-  protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception {
-    // Make sure AM is ready to use and registered with AM registry.
+  // ========= Master thread methods
+
+  private void handleInitResultOnMasterThread(
+      SessionInitContext sw, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    // For the failures, the users have been notified, we just need to clean up. There's no
+    // session here (or it's unused), so no conflicts are possible. We just remove it.
+    // For successes, the user has also been notified, so various requests are also possible;
+    // however, to start, we'd just put the session into the sessions list and go from there.
+    WmTezSession session = null;
+    sw.lock.lock();
     try {
-      session.waitForAmPluginInfo(amRegistryTimeoutMs);
-    } catch (TimeoutException ex) {
-      LOG.error("Timed out waiting for AM registry information for " + session.getSessionId());
-      session.destroy();
+      if (sw.state == SessionInitState.CANCELED) {
+        // We have processed this on the previous run, after it has already queued the message.
+        return;
+      }
+      assert sw.state == SessionInitState.DONE;
+      session = sw.session;
+      sw.session = null;
+    } finally {
+      sw.lock.unlock();
+    }
+    LOG.debug("Processing " + ((session == null) ? "failed" : "successful")
+        + " initialization result for pool " + sw.poolName);
+    // We could not have removed the pool for this session, or we would have CANCELED the init.
+    PoolState pool = pools.get(sw.poolName);
+    if (pool == null || !pool.initializingSessions.remove(sw)) {
+      // Query parallelism might be fubar.
+      LOG.error("Cannot remove initializing session from the pool "
+          + sw.poolName + " - internal error");
+    }
+    poolsToRedistribute.add(sw.poolName);
+    if (session != null) {
+      if (pool != null) {
+        pool.sessions.add(session);
+      } else {
+        LOG.error("Cannot add new session to the pool "
+          + sw.poolName + " because it was removed unexpectedly - internal error " + session);
+        syncWork.toRestartInUse.add(session);
+      }
+    }
+  }
+
+  private Boolean handleReturnedInUseSessionOnMasterThread(
+      EventState e, WmTezSession session, HashSet<String> poolsToRedistribute) {
+    // This handles the common logic for destroy and return - everything except
+    // the invalid combination of destroy and return themselves, as well as the actual
+    // statement that destroys or returns it.
+    if (e.updateErrors.remove(session)) {
+      LOG.debug("Ignoring an update error for a session being destroyed or returned");
+    }
+    SettableFuture<WmTezSession> future = e.toReopen.remove(session);
+    if (future != null) {
+      future.setException(new AssertionError("Invalid reopen attempt"));
+    }
+    GetRequest reuseRequest = e.toReuse.remove(session);
+    if (reuseRequest != null) {
+      reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+    }
+    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+  }
+
+  private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
+      SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute,
+      WmThreadSyncWork syncWork) throws Exception {
+    if (e.updateErrors.remove(session)) {
+      LOG.debug("Ignoring an update error for a session being reopened");
+    }
+    GetRequest reuseRequest = e.toReuse.remove(session);
+    if (reuseRequest != null) {
+      reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+    }
+    // In order to expedite things in a general case, we are not actually going to reopen
+    // anything. Instead, we will try to give out an existing session from the pool, and restart
+    // the problematic one in background.
+    String poolName = session.getPoolName();
+    Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+    // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
+    // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
+    if (isRemoved == null) {
+      future.setException(new RuntimeException("Reopen failed due to an internal error"));
+      syncWork.toRestartInUse.add(session);
+      return;
+    } else if (!isRemoved) {
+      future.setException(new RuntimeException("WM killed this session during reopen: "
+          + session.getReasonForKill()));
+      return; // No longer relevant for WM - bail.
+    }
+    // If pool didn't exist, removeSessionFromItsPool would have returned null.
+    PoolState pool = pools.get(poolName);
+    SessionInitContext sw = new SessionInitContext(future, poolName);
+    // We have just removed the session from the same pool, so don't check concurrency here.
+    pool.initializingSessions.add(sw);
+    ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+    Futures.addCallback(getFuture, sw);
+    syncWork.toRestartInUse.add(session);
+  }
+
+  private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError,
+      EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    GetRequest reuseRequest = e.toReuse.remove(sessionWithUpdateError);
+    if (reuseRequest != null) {
+      // This session is bad, so don't allow reuse; just convert it to normal get.
+      reuseRequest.sessionToReuse = null;
+    }
+    // TODO: we should communicate this to the user more explicitly (use kill query API, or
+    //       add an option for bg kill checking to TezTask/monitor?
+    // We are assuming the update-error AM is bad and just try to kill it.
+    Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute);
+    if (isRemoved != null && !isRemoved) {
+      // An update error for some session that was actually already killed by us.
+      return;
+    }
+    // Regardless whether it was removed successfully or after failing to remove, restart it.
+    // Since we just restart this from under the user, mark it so we handle it properly when
+    // the user tries to actually use this session and fails, proceeding to return/destroy it.
+    // TODO: propagate this error to TezJobMonitor somehow, after we add the use of KillQuery.
+    sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
+    syncWork.toRestartInUse.add(sessionWithUpdateError);
+  }
+
+  private void applyNewResourcePlanOnMasterThread(
+      EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    int totalQueryParallelism = 0;
+    // FIXME: Add Triggers from metastore to poolstate
+    // Note: we assume here that plan has been validated beforehand, so we don't verify
+    //       that fractions or query parallelism add up, etc.
+    this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings());
+    HashMap<String, PoolState> oldPools = pools;
+    pools = new HashMap<>();
+    // Use recursion to update parents more conveniently; we don't expect a big tree.
+    for (TmpHivePool pool : e.resourcePlanToApply.getRootPools()) {
+      totalQueryParallelism += addHivePool(
+          pool, null, oldPools, syncWork.toRestartInUse, poolsToRedistribute, e);
+    }
+    if (oldPools != null && !oldPools.isEmpty()) {
+      // Looks like some pools were removed; insert queued queries into the front of get reqs.
+      for (PoolState oldPool : oldPools.values()) {
+        oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
+      }
+    }
+
+    LOG.info("Updating with " + totalQueryParallelism + " total query parallelism");
+    int deltaSessions = totalQueryParallelism - this.totalQueryParallelism;
+    this.totalQueryParallelism = totalQueryParallelism;
+    if (deltaSessions == 0) return; // Nothing to do.
+    if (deltaSessions < 0) {
+      // First, see if we have unused sessions that we were planning to restart; get rid of those.
+      int toTransfer = Math.min(-deltaSessions, syncWork.toRestartInUse.size());
+      for (int i = 0; i < toTransfer; ++i) {
+        syncWork.toDestroyNoRestart.add(syncWork.toRestartInUse.pollFirst());
+      }
+      deltaSessions += toTransfer;
+    }
+    if (deltaSessions != 0) {
+      failOnFutureFailure(tezAmPool.resizeAsync(
+          deltaSessions, syncWork.toDestroyNoRestart));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void failOnFutureFailure(ListenableFuture<?> future) {
+    Futures.addCallback(future, FATAL_ERROR_CALLBACK);
+  }
+
+  private void queueGetRequestOnMasterThread(
+      GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
+    String poolName = userPoolMapping.mapSessionToPoolName(req.userName);
+    if (poolName == null) {
+      req.future.setException(new HiveException(
+          "Cannot find any pool mapping for user " + req.userName));
+      returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+      return;
+    }
+    PoolState pool = pools.get(poolName);
+    if (pool == null) {
+      req.future.setException(new AssertionError(poolName + " not found (internal error)."));
+      returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+      return;
+    }
+
+    PoolState oldPool = null;
+    if (req.sessionToReuse != null) {
+      // Given that we are trying to reuse, this session MUST be in some pool.sessions.
+      // Kills that could have removed it must have cleared sessionToReuse.
+      String oldPoolName = req.sessionToReuse.getPoolName();
+      oldPool = pools.get(oldPoolName);
+      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+      if (isRemoved == null || !isRemoved) {
+        // This is probably an internal error... abandon the reuse attempt.
+        returnSessionOnFailedReuse(req, syncWork, null);
+        req.sessionToReuse = null;
+      } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
+        // One cannot simply reuse the session if there are other queries waiting; to maintain
+        // fairness, we'll try to take a query slot instantly, and if that fails we'll return
+        // this session back to the pool and give the user a new session later.
+        returnSessionOnFailedReuse(req, syncWork, null);
+        req.sessionToReuse = null;
+      }
+    }
+
+    if (req.sessionToReuse != null) {
+      // If we can immediately reuse a session, there's nothing to wait for - just return.
+      req.sessionToReuse.setPoolName(poolName);
+      req.sessionToReuse.setQueueName(yarnQueue);
+      pool.sessions.add(req.sessionToReuse);
+      if (pool != oldPool) {
+        poolsToRedistribute.add(poolName);
+      }
+      req.future.set(req.sessionToReuse);
+      return;
+    }
+    // Otherwise, queue the session and make sure we update this pool.
+    pool.queue.addLast(req);
+    poolsToRedistribute.add(poolName);
+  }
+
+
+  private void processPoolChangesOnMasterThread(
+      String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception {
+    PoolState pool = pools.get(poolName);
+    if (pool == null) return; // Might be from before the new resource plan.
+
+    // 1. First, start the queries from the queue.
+    int queriesToStart = Math.min(pool.queue.size(),
+        pool.queryParallelism - pool.getTotalActiveSessions());
+    if (queriesToStart > 0) {
+      LOG.debug("Starting {} queries in pool {}", queriesToStart, pool);
+    }
+    if (hasRequeues) {
+      // Sort the queue - we may have put items here out of order.
+      Collections.sort(pool.queue, GetRequest.ORDER_COMPARATOR);
+    }
+    for (int i = 0; i < queriesToStart; ++i) {
+      GetRequest queueReq = pool.queue.pollFirst();
+      assert queueReq.sessionToReuse == null;
+      // Note that in theory, we are guaranteed to have a session waiting for us here, but
+      // the expiration, failures, etc. may cause one to be missing pending restart.
+      // See SessionInitContext javadoc.
+      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName);
+      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+      Futures.addCallback(getFuture, sw);
+      // It is possible that all the async methods returned on the same thread because the
+      // session with registry data and stuff was available in the pool.
+      // If this happens, we'll take the session out here and "cancel" the init so we skip
+      // processing the message that the successful init has queued for us.
+      boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions);
+      if (!isDone) {
+        pool.initializingSessions.add(sw);
+      }
+      // The user has already been notified of completion by SessionInitContext.
+    }
+
+    // 2. Then, update pool allocations.
+    double totalAlloc = pool.updateAllocationPercentages();
+    // We are calling this here because we expect the method to be completely async. We also don't
+    // want this call itself to go on a thread because we want the percent-to-physics conversion
+    // logic to be consistent between all the separate calls in one master thread processing round.
+    // Note: If allocation manager does not have cluster state, it won't update anything. When the
+    //       cluster state changes, it will notify us, and we'd update the queries again.
+    allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+
+    // 3. Update triggers for this pool.
+    //    TODO: need to merge with per-pool enforcement, it will only work for one pool for now.
+    if (sessionTriggerProvider != null) {
+      sessionTriggerProvider.setOpenSessions(
+          Collections.<TezSessionState>unmodifiableList(pool.sessions));
+      sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(pool.triggers));
+    }
+  }
+
+  private void returnSessionOnFailedReuse(
+      GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    if (req.sessionToReuse == null) return;
+    if (poolsToRedistribute != null) {
+      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+      // The session cannot have been killed; this happens after all the kills in the current
+      // iteration, so we would have cleared sessionToReuse when killing this.
+      assert isRemoved == null || isRemoved;
+    }
+    if (!tezAmPool.returnSessionAsync(req.sessionToReuse)) {
+      syncWork.toDestroyNoRestart.add(req.sessionToReuse);
+    }
+    req.sessionToReuse = null;
+  }
+
+  private int addHivePool(TmpHivePool pool, PoolState parent,
+      HashMap<String, PoolState> oldPools, List<WmTezSession> toKill,
+      HashSet<String> poolsToRedistribute, EventState e) {
+    String fullName = pool.getName();
+    int totalQueryParallelism = pool.getQueryParallelism();
+    double fraction = pool.getResourceFraction();
+    if (parent != null) {
+      fullName = parent.fullName + POOL_SEPARATOR + fullName;
+      fraction = parent.finalFraction * pool.getResourceFraction();
+      parent.finalFractionRemaining -= fraction;
+    }
+    PoolState state = oldPools == null ? null : oldPools.remove(fullName);
+    if (state == null) {
+      state = new PoolState(fullName, totalQueryParallelism, fraction);
+    } else {
+      // This will also take care of the queries if query parallelism changed.
+      state.update(totalQueryParallelism, fraction, toKill, e);
+      poolsToRedistribute.add(fullName);
+    }
+    state.setTriggers(pool.triggers);
+
+    if (pool.getChildren() != null) {
+      for (TmpHivePool child : pool.getChildren()) {
+        totalQueryParallelism += addHivePool(
+            child, state, oldPools, toKill, poolsToRedistribute, e);
+      }
+    }
+    LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
+    pools.put(fullName, state);
+    return totalQueryParallelism;
+  }
+
+
+  /**
+   * Checks if the session is still relevant for WM and if yes, removes it from its thread.
+   * @return true if the session was removed; false if the session was already processed by WM
+   *         thread (so we are dealing with an outdated request); null if the session should be
+   *         in WM but wasn't found in the requisite pool (internal error?).
+   */
+  private Boolean checkAndRemoveSessionFromItsPool(
+      WmTezSession session, HashSet<String> poolsToRedistribute) {
+    // It is possible for some request to be queued after a main thread has decided to kill this
+    // session; on the next iteration, we'd be processing that request with an irrelevant session.
+    if (session.isIrrelevantForWm()) {
       return false;
     }
-    return true;
+    // If we did not kill this session we expect everything to be present.
+    String poolName = session.getPoolName();
+    session.clearWm();
+    if (poolName != null) {
+      poolsToRedistribute.add(poolName);
+      PoolState pool = pools.get(poolName);
+      if (pool != null && pool.sessions.remove(session)) return true;
+    }
+    LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
+    return null;
   }
 
-  private void redistributePoolAllocations(
-      String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove,
-      boolean releaseParallelism) {
-    List<WmTezSession> sessionsToUpdate = null;
-    double totalAlloc = 0;
-    assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName());
-    assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName());
-    poolsLock.readLock().lock();
-    boolean hasRemoveFailed = false;
+  // ===== EVENT METHODS
+
+  public Future<Boolean> updateResourcePlanAsync(TmpResourcePlan plan) {
+    SettableFuture<Boolean> applyRpFuture = SettableFuture.create();
+    currentLock.lock();
     try {
-      PoolState pool = pools.get(poolName);
-      synchronized (pool.lock) {
-        // This should be a 2nd order fn but it's too much pain in Java for one LOC.
-        if (sessionToAdd != null) {
-          pool.sessions.add(sessionToAdd);
-        }
-        if (sessionToRemove != null) {
-          // TODO: this assumes that the update process will take the write lock, and make
-          //       everything right w.r.t. semaphores, pool names and other stuff, since we might
-          //       be releasing a different semaphore from the one we acquired if it's across
-          //       the update. If the magic in the update is weak, this may become more involved.
-          if (!pool.sessions.remove(sessionToRemove)) {
-            LOG.error("Session " + sessionToRemove + " could not be removed from the pool");
-            if (releaseParallelism) {
-              hasRemoveFailed = true;
-            }
-          } else if (releaseParallelism) {
-            pool.sessionsClaimed.release();
-          }
-          sessionToRemove.setClusterFraction(0);
-        }
-        totalAlloc = updatePoolAllocations(pool.sessions, pool.finalFractionRemaining);
-        sessionsToUpdate = new ArrayList<>(pool.sessions);
+      // TODO: if there's versioning/etc., it will come in here. For now we rely on external
+      //       locking or ordering of calls. This should potentially return a Future for that.
+      if (current.resourcePlanToApply != null) {
+        LOG.warn("Several resource plans are being applied at the same time; using the latest");
+        current.applyRpFuture.setException(
+            new HiveException("Another plan was applied in parallel"));
+      }
+      current.resourcePlanToApply = plan;
+      current.applyRpFuture = applyRpFuture;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return applyRpFuture;
+  }
+
+  private final static class GetRequest {
+    public static final Comparator<GetRequest> ORDER_COMPARATOR = new Comparator<GetRequest>() {
+      @Override
+      public int compare(GetRequest o1, GetRequest o2) {
+        if (o1.order == o2.order) return 0;
+        return o1.order < o2.order ? -1 : 1;
+      }
+    };
+    private final long order;
+    private final String userName;
+    private final SettableFuture<WmTezSession> future;
+    private WmTezSession sessionToReuse;
+
+    private GetRequest(String userName, SettableFuture<WmTezSession> future,
+        WmTezSession sessionToReuse, long order) {
+      this.userName = userName;
+      this.future = future;
+      this.sessionToReuse = sessionToReuse;
+      this.order = order;
+    }
+
+    @Override
+    public String toString() {
+      return "[#" + order + ", " + userName + ", reuse " + sessionToReuse + "]";
+    }
+  }
+
+  public TezSessionState getSession(
+      TezSessionState session, String userName, HiveConf conf) throws Exception {
+    // Note: not actually used for pool sessions; verify some things like doAs are not set.
+    validateConfig(conf);
+    SettableFuture<WmTezSession> future = SettableFuture.create();
+    WmTezSession wmSession = checkSessionForReuse(session);
+    GetRequest req = new GetRequest(
+        userName, future, wmSession, getRequestVersion.incrementAndGet());
+    currentLock.lock();
+    try {
+      current.getRequests.add(req);
+      if (req.sessionToReuse != null) {
+        // Note: we assume reuse is only possible for the same user and config.
+        current.toReuse.put(wmSession, req);
       }
+      notifyWmThreadUnderLock();
     } finally {
-      poolsLock.readLock().unlock();
+      currentLock.unlock();
     }
-    allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate);
-    updateSessionsTriggers();
-    if (hasRemoveFailed) {
-      throw new AssertionError("Cannot remove the session from the pool and release "
-          + "the query slot; HS2 may fail to accept queries");
+    return future.get();
+  }
+
+  @Override
+  public void destroy(TezSessionState session) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    resetGlobalTezSession(wmTezSession);
+    currentLock.lock();
+    try {
+      current.toDestroy.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
     }
   }
 
+  private void resetGlobalTezSession(WmTezSession wmTezSession) {
+    // This has to be done synchronously to avoid the caller getting this session again.
+    // Ideally we'd get rid of this thread-local nonsense.
+    SessionState sessionState = SessionState.get();
+    if (sessionState != null && sessionState.getTezSession() == wmTezSession) {
+      sessionState.setTezSession(null);
+    }
+  }
+
+  @Override
+  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    resetGlobalTezSession(wmTezSession);
+    currentLock.lock();
+    try {
+      current.toReturn.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  // TODO: use this
+  public void nofityOfClusterStateChange() {
+    currentLock.lock();
+    try {
+      current.hasClusterStateChanged = true;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  public void addUpdateError(WmTezSession wmTezSession) {
+    currentLock.lock();
+    try {
+      current.updateErrors.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  /**
+   * Adds a test event that's processed at the end of WM iteration.
+   * This allows tests to wait for an iteration to finish without messing with the threading
+   * logic (that is prone to races if we e.g. remember the state before and wait for it to change,
+   * self-deadlocking when triggering things explicitly and calling a blocking API, and hanging
+   * forever if we wait for "another iteration"). If addTestEvent is called after all the other
+   * calls of interest, it is guaranteed that the events from those calls will be processed
+   * fully when the future is triggered.
+   */
+  Future<Boolean> addTestEvent() {
+    SettableFuture<Boolean> testEvent = SettableFuture.create();
+    currentLock.lock();
+    try {
+      current.testEvent = testEvent;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return testEvent;
+  }
+
+  public void notifyInitializationCompleted(SessionInitContext initCtx) {
+    currentLock.lock();
+    try {
+      current.initResults.add(initCtx);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+
+  @Override
+  public TezSessionState reopen(TezSessionState session, Configuration conf,
+      String[] additionalFiles) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    HiveConf sessionConf = wmTezSession.getConf();
+    if (sessionConf == null) {
+      LOG.warn("Session configuration is null for " + wmTezSession);
+      sessionConf = new HiveConf(conf, WorkloadManager.class);
+    }
+    // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
+    //       with additional files will have to wait until HIVE-17827 is unfucked, because it's
+    //       difficult to determine how the additionalFiles are to be propagated/reused between
+    //       two sessions. Once the update logic is encapsulated in the session we can remove this.
+    if (additionalFiles != null && additionalFiles.length > 0) {
+      TezSessionPoolManager.reopenInternal(session, additionalFiles);
+      return session;
+    }
+
+    SettableFuture<WmTezSession> future = SettableFuture.create();
+    currentLock.lock();
+    try {
+      if (current.toReopen.containsKey(wmTezSession)) {
+        throw new AssertionError("The session is being reopened more than once " + session);
+      }
+      current.toReopen.put(wmTezSession, future);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return future.get();
+  }
+
+  @Override
+  public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
+    // By definition, this session is not in use and can no longer be in use, so it only
+    // affects the session pool. We can handle this inline.
+    tezAmPool.replaceSession(ensureOwnedSession(session), false, null);
+  }
+
+  // ======= VARIOUS UTILITY METHOD
+
+  private void notifyWmThreadUnderLock() {
+    if (hasChanges) return;
+    hasChanges = true;
+    hasChangesCondition.signalAll();
+  }
+
   private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception {
     if (session == null) return null;
     WmTezSession result = null;
@@ -391,7 +1035,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       if (result.isOwnedBy(this)) {
         return result;
       }
-      // TODO: this should never happen, at least for now. Throw?
+      // This should never happen, at least for now. Throw?
       LOG.warn("Attempting to reuse a session not belonging to us: " + result);
       result.returnToSessionManager();
       return null;
@@ -405,15 +1049,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return null;
   }
 
-  private double updatePoolAllocations(List<WmTezSession> sessions, double totalFraction) {
-    // TODO: real implementation involving in-the-pool policy interface, etc.
-    double allocation = totalFraction / sessions.size();
-    for (WmTezSession session : sessions) {
-      session.setClusterFraction(allocation);
-    }
-    return totalFraction;
-  }
-
   private void validateConfig(HiveConf conf) throws HiveException {
     String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
     if ((queueName != null) && !queueName.isEmpty()) {
@@ -429,69 +1064,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  public void start() throws Exception {
-    sessions.startInitialSessions();
-    if (expirationTracker != null) {
-      expirationTracker.start();
-    }
-    allocationManager.start();
-  }
-
-  public void stop() throws Exception {
-    List<TezSessionPoolSession> sessionsToClose = null;
-    synchronized (openSessions) {
-      sessionsToClose = new ArrayList<>(openSessions.keySet());
-    }
-
-    for (TezSessionPoolSession sessionState : sessionsToClose) {
-      sessionState.close(false);
-    }
-
-    if (expirationTracker != null) {
-      expirationTracker.stop();
-    }
-    allocationManager.stop();
-
-    INSTANCE = null;
-  }
-
-  private WmTezSession createSession() {
-    WmTezSession session = createSessionObject(TezSessionState.makeSessionId());
+  private WmTezSession createSession(HiveConf conf) {
+    WmTezSession session = createSessionObject(TezSessionState.makeSessionId(), conf);
     session.setQueueName(yarnQueue);
     session.setDefault();
-    LOG.info("Created new interactive session " + session.getSessionId());
+    LOG.info("Created new interactive session object " + session.getSessionId());
     return session;
   }
 
   @VisibleForTesting
-  protected WmTezSession createSessionObject(String sessionId) {
-    return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf));
-  }
-
-  @Override
-  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
-    returnAfterUse(session, true);
-  }
-
-  private void returnAfterUse(
-      TezSessionPoolSession session, boolean releaseParallelism) throws Exception {
-    boolean isInterrupted = Thread.interrupted();
-    try {
-      WmTezSession wmSession = ensureOwnedSession(session);
-      redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, releaseParallelism);
-      sessions.returnSession((WmTezSession) session);
-    } finally {
-      // Reset the interrupt status.
-      if (isInterrupted) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  /** Closes a running (expired) pool session and reopens it. */
-  @Override
-  public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
-    sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null);
+  protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
+    conf = (conf == null) ? new HiveConf(this.conf) : conf;
+    return new WmTezSession(sessionId, this, expirationTracker, conf);
   }
 
   private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
@@ -506,9 +1090,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   @Override
   public void registerOpenSession(TezSessionPoolSession session) {
     synchronized (openSessions) {
-      openSessions.put(session, null);
+      openSessions.put(session, true);
     }
-    updateSessionsTriggers();
   }
 
   /** Called by TezSessionPoolSession when closed. */
@@ -517,20 +1100,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     synchronized (openSessions) {
       openSessions.remove(session);
     }
-    updateSessionsTriggers();
-  }
-
-  private void updateSessionsTriggers() {
-    if (sessionTriggerProvider != null) {
-      List<TezSessionState> openSessions = new ArrayList<>();
-      List<Trigger> activeTriggers = new ArrayList<>();
-      for (PoolState poolState : pools.values()) {
-        activeTriggers.addAll(poolState.getTriggers());
-        openSessions.addAll(poolState.sessions);
-      }
-      sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions));
-      sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers));
-    }
   }
 
   @VisibleForTesting
@@ -538,41 +1107,25 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return expirationTracker;
   }
 
-  @Override
-  public TezSessionState reopen(TezSessionState session, Configuration conf,
-      String[] additionalFiles) throws Exception {
-    WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession();
-    newSession.setPoolName(oldSession.getPoolName());
-    HiveConf sessionConf = session.getConf();
-    if (sessionConf == null) {
-      LOG.warn("Session configuration is null for " + session);
-      // default queue name when the initial session was created
-      sessionConf = new HiveConf(conf, WorkloadManager.class);
-    }
-    sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf);
-    // We are going to immediately give this session out, so ensure AM registry.
-    if (!ensureAmIsRegistered(newSession)) {
-      throw new Exception("Session is not usable after reopen");
-    }
-    // Do not release the parallelism - we are just replacing the session in the same pool.
-    redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession, false);
-    return newSession;
+  @VisibleForTesting
+  int getNumSessions() {
+    return tezAmPool.getInitialSize();
   }
 
-  @Override
-  public void destroy(TezSessionState session) throws Exception {
-    LOG.warn("Closing a pool session because of retry failure.");
-    // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution.
-    WmTezSession wmSession = ensureOwnedSession(session);
-    closeAndReopenPoolSession(wmSession);
-    redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, true);
+  protected final HiveConf getConf() {
+    return conf;
   }
 
-  @VisibleForTesting
-  int getNumSessions() {
-    return sessions.getInitialSize();
+  public List<String> getTriggerCounterNames() {
+    List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
+    List<String> counterNames = new ArrayList<>();
+    for (Trigger trigger : activeTriggers) {
+      counterNames.add(trigger.getExpression().getCounterLimit().getName());
+    }
+    return counterNames;
   }
 
+
   @Override
   SessionTriggerProvider getSessionTriggerProvider() {
     return sessionTriggerProvider;
@@ -588,38 +1141,319 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return triggerValidatorRunnable;
   }
 
-  @VisibleForTesting
-  public Map<String, PoolState> getPools() {
-    return pools;
+  /**
+   * State of a single pool.
+   * Unless otherwise specified, the members are only modified by the master thread.
+   */
+  private static class PoolState {
+    // Add stuff here as WM is implemented.
+    private final LinkedList<SessionInitContext> initializingSessions = new LinkedList<>();
+    // Note: the list is expected to be a few items; if it's longer we may want an IHM.
+    private final LinkedList<WmTezSession> sessions = new LinkedList<>();
+    private final LinkedList<GetRequest> queue = new LinkedList<>();
+
+    private final String fullName;
+    private double finalFraction;
+    private double finalFractionRemaining;
+    private int queryParallelism = -1;
+    private List<Trigger> triggers = new ArrayList<>();
+
+    public PoolState(String fullName, int queryParallelism, double fraction) {
+      this.fullName = fullName;
+      update(queryParallelism, fraction, null, null);
+    }
+
+    public int getTotalActiveSessions() {
+      return sessions.size() + initializingSessions.size();
+    }
+
+    public void update(int queryParallelism, double fraction,
+        List<WmTezSession> toKill, EventState e) {
+      this.finalFraction = this.finalFractionRemaining = fraction;
+      this.queryParallelism = queryParallelism;
+      // TODO: two possible improvements
+      //       1) Right now we kill all the queries here; we could just kill -qpDelta.
+      //       2) After the queries are killed queued queries would take their place.
+      //          If we could somehow restart queries we could instead put them at the front
+      //          of the queue (esp. in conjunction with (1)) and rerun them.
+      if (queryParallelism < getTotalActiveSessions()) {
+        extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill);
+      }
+      // We will requeue, and not kill, the queries that are not running yet.
+      // Insert them all before the get requests from this iteration.
+      GetRequest req;
+      while ((req = queue.pollLast()) != null) {
+        e.getRequests.addFirst(req);
+      }
+    }
+
+    public void destroy(List<WmTezSession> toKill, LinkedList<GetRequest> globalQueue,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+      extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
+      // All the pending get requests should just be requeued elsewhere.
+      // Note that we never queue session reuse so sessionToReuse would be null.
+      globalQueue.addAll(0, queue);
+      queue.clear();
+    }
+
+    public double updateAllocationPercentages() {
+      // TODO: real implementation involving in-the-pool policy interface, etc.
+      double allocation = finalFractionRemaining / (sessions.size() + initializingSessions.size());
+      for (WmTezSession session : sessions) {
+        session.setClusterFraction(allocation);
+      }
+      // Do not give out the capacity of the initializing sessions to the running ones;
+      // we expect init to be fast.
+      return finalFractionRemaining - allocation * initializingSessions.size();
+    }
+
+    @Override
+    public String toString() {
+      return "[" + fullName + ", query parallelism " + queryParallelism
+          + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
+          + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
+          + ", initializing sessions " + initializingSessions.size() + "]";
+    }
+
+    private void extractAllSessionsToKill(String killReason,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse, List<WmTezSession> toKill) {
+      for (WmTezSession sessionToKill : sessions) {
+        resetRemovedSession(sessionToKill, killReason, toReuse);
+        toKill.add(sessionToKill);
+      }
+      sessions.clear();
+      for (SessionInitContext initCtx : initializingSessions) {
+        // It is possible that the background init thread has finished in parallel, queued
+        // the message for us but also returned the session to the user.
+        WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason);
+        if (sessionToKill == null) {
+          continue; // Async op in progress; the callback will take care of this.
+        }
+        resetRemovedSession(sessionToKill, killReason, toReuse);
+        toKill.add(sessionToKill);
+      }
+      initializingSessions.clear();
+    }
+
+    private void resetRemovedSession(WmTezSession sessionToKill, String killReason,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+      assert killReason != null;
+      sessionToKill.setIsIrrelevantForWm(killReason);
+      sessionToKill.clearWm();
+      GetRequest req = toReuse.remove(sessionToKill);
+      if (req != null) {
+        req.sessionToReuse = null;
+      }
+    }
+
+    @VisibleForTesting
+    // will change in HIVE-17809
+    public void setTriggers(final List<Trigger> triggers) {
+      this.triggers = triggers;
+    }
+
+    public List<Trigger> getTriggers() {
+      return triggers;
+    }
   }
 
-  protected final HiveConf getConf() {
-    return conf;
+
+  private enum SessionInitState {
+    GETTING, // We are getting a session from TezSessionPool
+    WAITING_FOR_REGISTRY, // We have the session but it doesn't have registry info yet.
+    DONE, // We have the session with registry info, or we have failed.
+    CANCELED // The master thread has CANCELED this and will never look at it again.
   }
 
-  public List<String> getTriggerCounterNames() {
-    List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
-    List<String> counterNames = new ArrayList<>();
-    for (Trigger trigger : activeTriggers) {
-      counterNames.add(trigger.getExpression().getCounterLimit().getName());
+  /**
+   * The class that serves as a synchronization point, and future callback,
+   * for async session initialization, as well as parallel cancellation.
+   */
+  private final class SessionInitContext implements FutureCallback<WmTezSession> {
+    private final String poolName;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private WmTezSession session;
+    private SettableFuture<WmTezSession> future;
+    private SessionInitState state;
+    private String cancelReason;
+
+    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName) {
+      this.state = SessionInitState.GETTING;
+      this.future = future;
+      this.poolName = poolName;
+    }
+
+    @Override
+    public void onSuccess(WmTezSession session) {
+      SessionInitState oldState;
+      SettableFuture<WmTezSession> future = null;
+      lock.lock();
+      try {
+        oldState = state;
+        switch (oldState) {
+        case GETTING: {
+          LOG.debug("Received a session from AM pool {}", session);
+          assert this.state == SessionInitState.GETTING;
+          session.setPoolName(poolName);
+          session.setQueueName(yarnQueue);
+          this.session = session;
+          this.state = SessionInitState.WAITING_FOR_REGISTRY;
+          break;
+        }
+        case WAITING_FOR_REGISTRY: {
+          assert this.session != null;
+          this.state = SessionInitState.DONE;
+          assert session == this.session;
+          future = this.future;
+          this.future = null;
+          break;
+        }
+        case CANCELED: {
+          future = this.future;
+          this.session = null;
+          this.future = null;
+          break;
+        }
+        default: {
+          future = this.future;
+          this.future = null;
+          break;
+        }
+        }
+      } finally {
+        lock.unlock();
+      }
+      switch (oldState) {
+      case GETTING: {
+        ListenableFuture<WmTezSession> waitFuture = session.waitForAmRegistryAsync(
+            amRegistryTimeoutMs, timeoutPool);
+        Futures.addCallback(waitFuture, this);
+        break;
+      }
+      case WAITING_FOR_REGISTRY: {
+        // Notify the master thread and the user.
+        future.set(session);
+        notifyInitializationCompleted(this);
+        break;
+      }
+      case CANCELED: {
+        // Return session to the pool; we can do it directly here.
+        future.setException(new HiveException(
+            "The query was killed by workload management: " + cancelReason));
+        session.setPoolName(null);
+        session.setClusterFraction(0f);
+        tezAmPool.returnSession(session);
+        break;
+      }
+      default: {
+        AssertionError error = new AssertionError("Unexpected state " + state);
+        future.setException(error);
+        throw error;
+      }
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      SettableFuture<WmTezSession> future;
+      WmTezSession session;
+      boolean wasCANCELED = false;
+      lock.lock();
+      try {
+        wasCANCELED = (state == SessionInitState.CANCELED);
+        session = this.session;
+        future = this.future;
+        this.future = null;
+        this.session = null;
+        if (!wasCANCELED) {
+          this.state = SessionInitState.DONE;
+        }
+      } finally {
+        lock.unlock();
+      }
+      future.setException(t);
+      if (!wasCANCELED) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Queueing the initialization failure with " + session);
+        }
+        notifyInitializationCompleted(this); // Report failure to the main thread.
+      }
+      if (session != null) {
+        session.clearWm();
+        // We can just restart the session if we have received one.
+        try {
+          tezAmPool.replaceSession(session, false, null);
+        } catch (Exception e) {
+          LOG.error("Failed to restart a failed session", e);
+        }
+      }
+    }
+
+    /** Cancel the async operation (even if it's done), and return the session if done. */
+    public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) {
+      lock.lock();
+      try {
+        SessionInitState state = this.state;
+        this.state = SessionInitState.CANCELED;
+        this.cancelReason = cancelReason;
+        if (state == SessionInitState.DONE) {
+          WmTezSession result = this.session;
+          this.session = null;
+          return result;
+        } else {
+          // In the states where a background operation is in progress, wait for the callback.
+          // Also, ignore any duplicate calls; also don't kill failed ones - handled elsewhere.
+          if (state == SessionInitState.CANCELED) {
+            LOG.warn("Duplicate call to extract " + session);
+          }
+          return null;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /** Extracts the session and cancel the operation, both only if done. */
+    public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) {
+      lock.lock();
+      try {
+        if (state != SessionInitState.DONE) return false;
+        this.state = SessionInitState.CANCELED;
+        if (this.session != null) {
+          results.add(this.session);
+        } // Otherwise we have failed; the callback has taken care of the failure.
+        this.session = null;
+        return true;
+      } finally {
+        lock.unlock();
+      }
     }
-    return counterNames;
   }
 
 
+
   // TODO: temporary until real WM schema is created.
   public static class TmpHivePool {
     private final String name;
     private final List<TmpHivePool> children;
     private final int queryParallelism;
     private final double resourceFraction;
+    private final List<Trigger> triggers;
 
     public TmpHivePool(String name,
         List<TmpHivePool> children, int queryParallelism, double resourceFraction) {
+      this(name, children, queryParallelism, resourceFraction, null);
+    }
+
+    public TmpHivePool(String name,
+        List<TmpHivePool> children, int queryParallelism, double resourceFraction,
+        List<Trigger> triggers) {
       this.name = name;
       this.children = children;
       this.queryParallelism = queryParallelism;
       this.resourceFraction = resourceFraction;
+      this.triggers = triggers;
     }
 
     public String getName() {
@@ -679,4 +1513,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       return mappings;
     }
   }
+
+  @VisibleForTesting
+  TezSessionPool<WmTezSession> getTezAmPool() {
+    return tezAmPool;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index 209cf57..2623a0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -92,23 +92,18 @@ public class LlapClusterStateForCompile {
     return numExecutorsPerNode;
   }
 
-  public boolean initClusterInfo() {
-    return initClusterInfo(true);
-  }
-
-  private boolean isUpdateNeeded(boolean allowUpdate) {
+  private boolean isUpdateNeeded() {
     Long lastUpdateLocal = lastClusterUpdateNs;
     if (lastUpdateLocal == null) return true;
-    if (!allowUpdate) return false;
     long elapsed = System.nanoTime() - lastUpdateLocal;
     return (elapsed >= updateIntervalNs);
   }
 
-  public boolean initClusterInfo(boolean allowUpdate) {
-    if (!isUpdateNeeded(allowUpdate)) return true;
+  public boolean initClusterInfo() {
+    if (!isUpdateNeeded()) return true;
     synchronized (updateInfoLock) {
       // At this point, no one will take the write lock and update, so we can do the last check.
-      if (!isUpdateNeeded(allowUpdate)) return true;
+      if (!isUpdateNeeded()) return true;
       if (svc == null) {
         try {
           svc = LlapRegistryService.getClient(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 59efd43..5248454 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -19,14 +19,17 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
-import java.util.Collection;
-import org.apache.hadoop.fs.Path;
+import com.google.common.util.concurrent.Futures;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
-import java.net.URISyntaxException;
-
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.security.auth.login.LoginException;
-
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +48,7 @@ public class SampleTezSessionState extends WmTezSession {
   private final HiveConf hiveConf;
   private String user;
   private boolean doAsEnabled;
+  private ListenableFuture<Boolean> waitForAmRegFuture;
 
   public SampleTezSessionState(
       String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
@@ -52,6 +56,13 @@ public class SampleTezSessionState extends WmTezSession {
         ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
     this.sessionId = sessionId;
     this.hiveConf = conf;
+    waitForAmRegFuture = createDefaultWaitForAmRegistryFuture();
+  }
+
+  private SettableFuture<Boolean> createDefaultWaitForAmRegistryFuture() {
+    SettableFuture<Boolean> noWait = SettableFuture.create();
+    noWait.set(true); // By default, do not wait.
+    return noWait;
   }
 
   @Override
@@ -106,4 +117,27 @@ public class SampleTezSessionState extends WmTezSession {
   public boolean getDoAsEnabled() {
     return this.doAsEnabled;
   }
+
+  @Override
+  public SettableFuture<WmTezSession> waitForAmRegistryAsync(
+      int timeoutMs, ScheduledExecutorService timeoutPool) {
+    final SampleTezSessionState session = this;
+    final SettableFuture<WmTezSession> future = SettableFuture.create();
+    Futures.addCallback(waitForAmRegFuture, new FutureCallback<Boolean>() {
+      @Override
+      public void onSuccess(Boolean result) {
+        future.set(session);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        future.setException(t);
+      }
+    });
+    return future;
+  }
+
+  public void setWaitForAmRegistryFuture(ListenableFuture<Boolean> future) {
+    waitForAmRegFuture = future != null ? future : createDefaultWaitForAmRegistryFuture();
+  }
 }