You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/02 18:40:50 UTC

[1/3] hive git commit: HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master c5a9673a0 -> 77b99e4c9


http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 258a865..94f42dd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -23,27 +23,28 @@ import static org.junit.Assert.*;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.*;
 
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.SettableFuture;
 
 import com.google.common.collect.Lists;
-import java.io.IOException;
 import java.lang.Thread.State;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestWorkloadManager {
+  @SuppressWarnings("unused")
   private static final Logger LOG = LoggerFactory.getLogger(TestWorkloadManager.class);
 
   private final class GetSessionRunnable implements Runnable {
@@ -91,7 +92,7 @@ public class TestWorkloadManager {
     }
 
     @Override
-    public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions) {
+    public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions) {
       isCalled = true;
     }
 
@@ -105,12 +106,12 @@ public class TestWorkloadManager {
 
     public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
         QueryAllocationManager qam) {
-      super(yarnQueue, conf, qam, null, createDummyPlan(numSessions));
+      super(yarnQueue, conf, qam, createDummyPlan(numSessions));
     }
 
     public WorkloadManagerForTest(String yarnQueue, HiveConf conf,
         QueryAllocationManager qam, TmpResourcePlan plan) {
-      super(yarnQueue, conf, qam, null, plan);
+      super(yarnQueue, conf, qam, plan);
     }
 
     private static TmpResourcePlan createDummyPlan(int numSessions) {
@@ -120,13 +121,42 @@ public class TestWorkloadManager {
     }
 
     @Override
-    protected WmTezSession createSessionObject(String sessionId) {
-      return new SampleTezSessionState(sessionId, this, new HiveConf(getConf()));
+    protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
+      conf = conf == null ? new HiveConf(getConf()) : conf;
+      return new SampleTezSessionState(sessionId, this, conf);
+    }
+
+    @Override
+    public TezSessionState getSession(
+        TezSessionState session, String userName, HiveConf conf) throws Exception {
+      // We want to wait for the iteration to finish and set the cluster fraction.
+      TezSessionState state = super.getSession(session, userName, conf);
+      ensureWm();
+      return state;
     }
 
     @Override
-    protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception {
-      return true;
+    public void destroy(TezSessionState session) throws Exception {
+      super.destroy(session);
+      ensureWm();
+    }
+
+    private void ensureWm() throws InterruptedException, ExecutionException {
+      addTestEvent().get(); // Wait for the events to be processed.
+    }
+
+    @Override
+    public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+      super.returnAfterUse(session);
+      ensureWm();
+    }
+
+    @Override
+    public TezSessionState reopen(
+        TezSessionState session, Configuration conf, String[] additionalFiles) throws Exception {
+      session = super.reopen(session, conf, additionalFiles);
+      ensureWm();
+      return session;
     }
   }
 
@@ -185,7 +215,8 @@ public class TestWorkloadManager {
     qam.assertWasCalled();
     WmTezSession session2 = (WmTezSession) session.reopen(conf, null);
     assertNotSame(session, session2);
-    assertEquals(1.0, session2.getClusterFraction(), EPSILON);
+    wm.addTestEvent().get();
+    assertEquals(session2.toString(), 1.0, session2.getClusterFraction(), EPSILON);
     assertEquals(0.0, session.getClusterFraction(), EPSILON);
     qam.assertWasCalled();
   }
@@ -273,7 +304,7 @@ public class TestWorkloadManager {
 
     Thread t1 = new Thread(new GetSessionRunnable(sessionA3, wm, error, conf, cdl, "A")),
         t2 = new Thread(new GetSessionRunnable(sessionA4, wm, error, conf, null, "A"));
-    waitForThreadToBlockOnQueue(cdl, t1);
+    waitForThreadToBlock(cdl, t1);
     t2.start();
     assertNull(sessionA3.get());
     assertNull(sessionA4.get());
@@ -327,7 +358,7 @@ public class TestWorkloadManager {
     CountDownLatch cdl = new CountDownLatch(1), cdl2 = new CountDownLatch(1);
     Thread t1 = new Thread(new GetSessionRunnable(session3, wm, error, conf, cdl, null), "t1"),
         t2 = new Thread(new GetSessionRunnable(session4, wm, error, conf, cdl2, null), "t2");
-    waitForThreadToBlockOnQueue(cdl, t1);
+    waitForThreadToBlock(cdl, t1);
     assertNull(session3.get());
     checkError(error);
     t2.start();
@@ -351,7 +382,7 @@ public class TestWorkloadManager {
     session4.get().returnToSessionManager();
   }
 
-  private void waitForThreadToBlockOnQueue(CountDownLatch cdl, Thread t1) throws InterruptedException {
+  private void waitForThreadToBlock(CountDownLatch cdl, Thread t1) throws InterruptedException {
     t1.start();
     cdl.await();
     // Wait for t1 to block, just be sure. Not ideal...
@@ -392,6 +423,308 @@ public class TestWorkloadManager {
     sessionA2.returnToSessionManager();
   }
 
+  @Test(timeout=10000)
+  public void testApplyPlanUserMapping() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 0.5f), new TmpHivePool("B", null, 1, 0.5f)),
+        Lists.newArrayList(create("U", "A")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+ 
+    // One session will be running, the other will be queued in "A"
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "U", conf);
+    assertEquals("A", sessionA1.getPoolName());
+    assertEquals(0.5f, sessionA1.getClusterFraction(), EPSILON);
+    final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>();
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    final CountDownLatch cdl = new CountDownLatch(1);
+    Thread t1 = new Thread(new GetSessionRunnable(sessionA2, wm, error, conf, cdl, "U"));
+    waitForThreadToBlock(cdl, t1);
+    assertNull(sessionA2.get());
+    checkError(error);
+
+    // Now change the resource plan - change the mapping for the user.
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 0.6f), new TmpHivePool("B", null, 1, 0.4f)),
+        Lists.newArrayList(create("U", "B")));
+    wm.updateResourcePlanAsync(plan);
+
+    // The session will go to B with the new mapping; check it.
+    t1.join();
+    checkError(error);
+    assertNotNull(sessionA2.get());
+    assertEquals("B", sessionA2.get().getPoolName());
+    assertEquals(0.4f, sessionA2.get().getClusterFraction(), EPSILON);
+    // The new session will also go to B now.
+    sessionA2.get().returnToSessionManager();
+    WmTezSession sessionB1 = (WmTezSession) wm.getSession(null, "U", conf);
+    assertEquals("B", sessionB1.getPoolName());
+    assertEquals(0.4f, sessionB1.getClusterFraction(), EPSILON);
+    sessionA1.returnToSessionManager();
+    sessionB1.returnToSessionManager();
+  }
+
+
+  @Test(timeout=10000)
+  public void testApplyPlanQpChanges() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 0.35f), new TmpHivePool("B", null, 2, 0.15f),
+        new TmpHivePool("C", null, 2, 0.3f), new TmpHivePool("D", null, 1, 0.3f)),
+        Lists.newArrayList(create("A", "A"), create("B", "B"),
+            create("C", "C"), create("D", "D")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+ 
+    // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
+    // Total: 5/6 running.
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, "A", conf),
+        sessionB1 = (WmTezSession) wm.getSession(null, "B", conf),
+        sessionB2 = (WmTezSession) wm.getSession(null, "B", conf),
+        sessionC1 = (WmTezSession) wm.getSession(null, "C", conf),
+        sessionD1 = (WmTezSession) wm.getSession(null, "D", conf);
+    final AtomicReference<WmTezSession> sessionA2 = new AtomicReference<>(),
+        sessionD2 = new AtomicReference<>();
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    final CountDownLatch cdl1 = new CountDownLatch(1), cdl2 = new CountDownLatch(1);
+    Thread t1 = new Thread(new GetSessionRunnable(sessionA2, wm, error, conf, cdl1, "A")),
+        t2 = new Thread(new GetSessionRunnable(sessionD2, wm, error, conf, cdl2, "D"));
+    waitForThreadToBlock(cdl1, t1);
+    waitForThreadToBlock(cdl2, t2);
+    checkError(error);
+    assertEquals(0.3f, sessionC1.getClusterFraction(), EPSILON);
+    assertEquals(0.3f, sessionD1.getClusterFraction(), EPSILON);
+
+    // Change the resource plan - resize B and C down, D up, and remove A remapping users to B.
+    // Everything will be killed in A and B, C won't change, D will start one more query from
+    // the queue, and the query queued in A will be re-queued in B and started.
+    // The fractions will also all change.
+    // Total: 4/4 running.
+    plan = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool("B", null, 1, 0.3f),
+        new TmpHivePool("C", null, 1, 0.2f), new TmpHivePool("D", null, 2, 0.5f)),
+        Lists.newArrayList(create("A", "B"), create("B", "B"),
+            create("C", "C"), create("D", "D")));
+    wm.updateResourcePlanAsync(plan);
+    wm.addTestEvent().get();
+
+    t1.join();
+    t2.join();
+    checkError(error);
+    assertNotNull(sessionA2.get());
+    assertNotNull(sessionD2.get());
+    assertEquals("D", sessionD2.get().getPoolName());
+    assertEquals("B", sessionA2.get().getPoolName());
+    assertEquals("C", sessionC1.getPoolName());
+    assertEquals(0.3f, sessionA2.get().getClusterFraction(), EPSILON);
+    assertEquals(0.2f, sessionC1.getClusterFraction(), EPSILON);
+    assertEquals(0.25f, sessionD1.getClusterFraction(), EPSILON);
+
+    assertKilledByWm(sessionA1);
+    assertKilledByWm(sessionB1);
+    assertKilledByWm(sessionB2);
+
+    // Wait for another iteration to make sure event gets processed for D2 to receive allocation.
+    sessionA2.get().returnToSessionManager();
+    assertEquals(0.25f, sessionD2.get().getClusterFraction(), EPSILON);
+    sessionD2.get().returnToSessionManager();
+    sessionC1.returnToSessionManager();
+    sessionD1.returnToSessionManager();
+
+    // Try to "return" stuff that was killed from "under" us. Should be a no-op.
+    sessionA1.returnToSessionManager();
+    sessionB1.returnToSessionManager();
+    sessionB2.returnToSessionManager(); 
+    assertEquals(4, wm.getTezAmPool().getCurrentSize());
+  }
+
+  @Test(timeout=10000)
+  public void testAmPoolInteractions() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+    // Take away the only session, as if it was expiring.
+    TezSessionPool<WmTezSession> pool = wm.getTezAmPool();
+    WmTezSession oob = pool.getSession();
+ 
+    final AtomicReference<WmTezSession> sessionA1 = new AtomicReference<>();
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    final CountDownLatch cdl1 = new CountDownLatch(1);
+    Thread t1 = new Thread(new GetSessionRunnable(sessionA1, wm, error, conf, cdl1, "A"));
+    waitForThreadToBlock(cdl1, t1);
+    checkError(error);
+    // Replacing it directly in the pool should unblock get.
+    pool.replaceSession(oob, false, null);
+    t1.join();
+    assertNotNull(sessionA1.get());
+    assertEquals("A", sessionA1.get().getPoolName());
+
+    // Increase qp, check that the pool grows.
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 4, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    WmTezSession oob2 = pool.getSession(),
+        oob3 = pool.getSession(),
+        oob4 = pool.getSession();
+    pool.returnSession(oob2);
+    assertEquals(1, pool.getCurrentSize());
+
+    // Decrease qp, check that the pool shrinks incl. killing the unused and returned sessions.
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    wm.addTestEvent().get();
+    assertEquals(0, pool.getCurrentSize());
+     sessionA1.get().returnToSessionManager();
+    pool.returnSession(oob3);
+    assertEquals(0, pool.getCurrentSize());
+    pool.returnSession(oob4);
+    assertEquals(1, pool.getCurrentSize());
+
+    // Decrease, then increase qp - sessions should not be killed on return.
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    oob2 = pool.getSession();
+    oob3 = pool.getSession();
+    assertEquals(0, pool.getCurrentSize());
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 2, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    wm.addTestEvent().get();
+    assertEquals(0, pool.getCurrentSize());
+    pool.returnSession(oob3);
+    pool.returnSession(oob4);
+    assertEquals(2, pool.getCurrentSize());
+  }
+
+  @Test(timeout=10000)
+  public void testAsyncSessionInitFailures() throws Exception {
+    final HiveConf conf = createConf();
+    MockQam qam = new MockQam();
+    TmpResourcePlan plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+
+    // Make sure session init gets stuck in init.
+    TezSessionPool<WmTezSession> pool = wm.getTezAmPool();
+    SampleTezSessionState theOnlySession = (SampleTezSessionState) pool.getSession();
+    SettableFuture<Boolean> blockedWait = SettableFuture.create();
+    theOnlySession.setWaitForAmRegistryFuture(blockedWait);
+    pool.returnSession(theOnlySession);
+    assertEquals(1, pool.getCurrentSize());
+
+    final AtomicReference<WmTezSession> sessionA = new AtomicReference<>();
+    final AtomicReference<Throwable> error = new AtomicReference<>();
+    CountDownLatch cdl = new CountDownLatch(1);
+    Thread t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));
+    waitForThreadToBlock(cdl, t1);
+    checkError(error);
+    wm.addTestEvent().get();
+    // The session is taken out of the pool, but is waiting for registration.
+    assertEquals(0, pool.getCurrentSize());
+
+    // Change the resource plan, so that the session gets killed.
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("B", null, 1, 1.0f)), Lists.newArrayList(create("A", "B")));
+    wm.updateResourcePlanAsync(plan);
+    wm.addTestEvent().get();
+    blockedWait.set(true); // Meanwhile, the init succeeds!
+    t1.join();
+    try {
+      sessionA.get();
+      fail("Expected an error but got " + sessionA.get());
+    } catch (Throwable t) {
+      // Expected.
+    }
+    try {
+      // The get-session call should also fail.
+      checkError(error);
+      fail("Expected an error");
+    } catch (Exception ex) {
+      // Expected.
+    }
+    error.set(null);
+    theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
+
+    // Initialization fails, no resource plan change.
+    SettableFuture<Boolean> failedWait = SettableFuture.create();
+    failedWait.setException(new Exception("foo"));
+    theOnlySession.setWaitForAmRegistryFuture(failedWait);
+    try {
+      TezSessionState r = wm.getSession(null, "A", conf);
+      fail("Expected an error but got " + r);
+    } catch (Exception ex) {
+      // Expected.
+    }
+    theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
+
+    // Init fails, but the session is also killed by WM before that.
+    failedWait = SettableFuture.create();
+    theOnlySession.setWaitForAmRegistryFuture(failedWait);
+    sessionA.set(null);
+    cdl = new CountDownLatch(1);
+    t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));
+    waitForThreadToBlock(cdl, t1);
+    wm.addTestEvent().get();
+    // The session is taken out of the pool, but is waiting for registration.
+    assertEquals(0, pool.getCurrentSize());
+
+    plan = new TmpResourcePlan(Lists.newArrayList(
+        new TmpHivePool("A", null, 1, 1.0f)), Lists.newArrayList(create("A", "A")));
+    wm.updateResourcePlanAsync(plan);
+    wm.addTestEvent().get();
+    failedWait.setException(new Exception("moo")); // Meanwhile, the init fails.
+    t1.join();
+    try {
+      sessionA.get();
+      fail("Expected an error but got " + sessionA.get());
+    } catch (Throwable t) {
+      // Expected.
+    }
+    try {
+      // The get-session call should also fail.
+      checkError(error);
+      fail("Expected an error");
+    } catch (Exception ex) {
+      // Expected.
+    }
+    validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "A");
+  }
+
+  private SampleTezSessionState validatePoolAfterCleanup(
+      SampleTezSessionState oldSession, HiveConf conf, WorkloadManager wm,
+      TezSessionPool<WmTezSession> pool, String sessionPoolName) throws Exception {
+    // Make sure the cleanup doesn't leave the pool without a session.
+    SampleTezSessionState theOnlySession = (SampleTezSessionState) pool.getSession();
+    assertNotNull(theOnlySession);
+    theOnlySession.setWaitForAmRegistryFuture(null);
+    assertNull(oldSession.getPoolName());
+    assertEquals(0f, oldSession.getClusterFraction(), EPSILON);
+    pool.returnSession(theOnlySession);
+    // Make sure we can actually get a session still - parallelism/etc. should not be affected.
+    WmTezSession result = (WmTezSession) wm.getSession(null, "A", conf);
+    assertEquals(sessionPoolName, result.getPoolName());
+    assertEquals(1f, result.getClusterFraction(), EPSILON);
+    result.returnToSessionManager();
+    return theOnlySession;
+  }
+
+  private void assertKilledByWm(WmTezSession session) {
+    assertNull(session.getPoolName());
+    assertEquals(0f, session.getClusterFraction(), EPSILON);
+    assertTrue(session.isIrrelevantForWm());
+  }
+
   private void checkError(final AtomicReference<Throwable> error) throws Exception {
     Throwable t = error.get();
     if (t == null) return;


[2/3] hive git commit: HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 35e5710..b0c6d58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -17,20 +17,34 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Semaphore;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -38,12 +52,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +59,14 @@ import org.slf4j.LoggerFactory;
 
 /** Workload management entry point for HS2. */
 public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
-  implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
+    implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
   // TODO: this is a temporary setting that will go away, so it's not in HiveConf.
   public static final String TEST_WM_CONFIG = "hive.test.workload.management";
+  private static final char POOL_SEPARATOR = '/';
 
   private final HiveConf conf;
-  private final TezSessionPool<WmTezSession> sessions;
+  private final TezSessionPool<WmTezSession> tezAmPool;
   private final SessionExpirationTracker expirationTracker;
   private final RestrictedConfigChecker restrictedConfig;
   private final QueryAllocationManager allocationManager;
@@ -69,56 +78,50 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       new IdentityHashMap<>();
   private final int amRegistryTimeoutMs;
 
-
-  /** Sessions given out (i.e. between get... and return... calls), separated by Hive pool. */
-  private final ReentrantReadWriteLock poolsLock = new ReentrantReadWriteLock();
-  private final Map<String, PoolState> pools = new HashMap<>();
+  // Note: pools can only be modified by the master thread.
+  private HashMap<String, PoolState> pools;
   // Used to make sure that waiting getSessions don't block update.
-  private int internalPoolsVersion;
   private UserPoolMapping userPoolMapping;
+  private int totalQueryParallelism;
+  // We index the get requests to make sure there are no ordering artifacts when we requeue.
+  private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
 
   private SessionTriggerProvider sessionTriggerProvider;
   private TriggerActionHandler triggerActionHandler;
   private TriggerValidatorRunnable triggerValidatorRunnable;
 
-  public static class PoolState {
-    // Add stuff here as WM is implemented.
-    private final Object lock = new Object();
-    private final List<WmTezSession> sessions = new ArrayList<>();
-    private final Semaphore sessionsClaimed;
+  // Note: we could use RW lock to allow concurrent calls for different sessions, however all
+  //       those calls do is add elements to lists and maps; and we'd need to sync those separately
+  //       separately, plus have an object to notify because RW lock does not support conditions
+  //       in any sensible way. So, for now the lock is going to be epic.
+  private final ReentrantLock currentLock = new ReentrantLock();
+  private final Condition hasChangesCondition = currentLock.newCondition();
+  // The processing thread will switch between these two objects.
+  private final EventState one = new EventState(), two = new EventState();
+  private boolean hasChanges = false;
+  private EventState current = one;
 
-    private final String fullName;
-    private final double finalFraction;
-    private double finalFractionRemaining;
-    private final int queryParallelism;
-    private List<Trigger> triggers = new ArrayList<>();
-
-    public PoolState(String fullName, int queryParallelism, double fraction) {
-      this.fullName = fullName;
-      this.queryParallelism = queryParallelism;
-      // A fair semaphore to ensure correct queue order.
-      this.sessionsClaimed = new Semaphore(queryParallelism, true);
-      this.finalFraction = this.finalFractionRemaining = fraction;
-    }
+  /** The master thread the processes the events from EventState. */
+  @VisibleForTesting
+  protected final Thread wmThread;
+  /** Used by the master thread to offload calls blocking on smth other than fast locks. */
+  private final ExecutorService workPool;
+  /** Used to schedule timeouts for some async operations. */
+  private final ScheduledExecutorService timeoutPool;
+  private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
 
+  @SuppressWarnings("rawtypes")
+  private final FutureCallback FATAL_ERROR_CALLBACK = new FutureCallback() {
     @Override
-    public String toString() {
-      return "[" + fullName + ", query parallelism " + queryParallelism
-          + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
-          + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
-          + "]";
-    }
-
-    @VisibleForTesting
-    // will change in HIVE-17809
-    public void setTriggers(final List<Trigger> triggers) {
-      this.triggers = triggers;
+    public void onSuccess(Object result) {
     }
 
-    public List<Trigger> getTriggers() {
-      return triggers;
+    @Override
+    public void onFailure(Throwable t) {
+      // TODO: shut down HS2?
+      LOG.error("Workload management fatal error", t);
     }
-  }
+  };
 
   // TODO: this is temporary before HiveServerEnvironment is merged.
   private static volatile WorkloadManager INSTANCE;
@@ -135,71 +138,86 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   /** Called once, when HS2 initializes. */
   public static WorkloadManager create(String yarnQueue, HiveConf conf, TmpResourcePlan plan) {
     assert INSTANCE == null;
-    Token<JobTokenIdentifier> amsToken = createAmsToken();
     // We could derive the expected number of AMs to pass in.
-    LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, amsToken, -1);
+    LlapPluginEndpointClient amComm = new LlapPluginEndpointClientImpl(conf, null, -1);
     QueryAllocationManager qam = new GuaranteedTasksAllocator(conf, amComm);
-    return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, amsToken, plan));
-  }
-
-  private static Token<JobTokenIdentifier> createAmsToken() {
-    if (!UserGroupInformation.isSecurityEnabled()) return null;
-    // This application ID is completely bogus.
-    ApplicationId id = ApplicationId.newInstance(
-        System.nanoTime(), (int)(System.nanoTime() % 100000));
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString()));
-    JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
-    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jobTokenManager);
-    sessionToken.setService(identifier.getJobId());
-    return sessionToken;
+    return (INSTANCE = new WorkloadManager(yarnQueue, conf, qam, plan));
   }
 
   @VisibleForTesting
   WorkloadManager(String yarnQueue, HiveConf conf,
-      QueryAllocationManager qam, Token<JobTokenIdentifier> amsToken, TmpResourcePlan plan) {
+      QueryAllocationManager qam, TmpResourcePlan plan) {
     this.yarnQueue = yarnQueue;
     this.conf = conf;
-    int numSessions = initializeHivePools(plan);
-    LOG.info("Initializing with " + numSessions + " total query parallelism");
+    this.totalQueryParallelism = applyInitialResourcePlan(plan);
+    LOG.info("Initializing with " + totalQueryParallelism + " total query parallelism");
 
     this.amRegistryTimeoutMs = (int)HiveConf.getTimeVar(
         conf, ConfVars.HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT, TimeUnit.MILLISECONDS);
-    sessions = new TezSessionPool<>(conf, numSessions, true);
+    tezAmPool = new TezSessionPool<>(conf, totalQueryParallelism, true,
+        new TezSessionPool.SessionObjectFactory<WmTezSession>() {
+          @Override
+          public WmTezSession create(WmTezSession oldSession) {
+            return createSession(oldSession == null ? null : oldSession.getConf());
+          }
+    });
     restrictedConfig = new RestrictedConfigChecker(conf);
     allocationManager = qam;
     // Only creates the expiration tracker if expiration is configured.
     expirationTracker = SessionExpirationTracker.create(conf, this);
-    for (int i = 0; i < numSessions; i++) {
-      sessions.addInitialSession(createSession());
-    }
+    ThreadFactory workerFactory = new ThreadFactory() {
+      private final AtomicInteger threadNumber = new AtomicInteger(-1);
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, "Workload management worker " + threadNumber.incrementAndGet());
+        t.setDaemon(true);
+        return t;
+      }
+    };
+    workPool = Executors.newFixedThreadPool(HiveConf.getIntVar(conf,
+        ConfVars.HIVE_SERVER2_TEZ_WM_WORKER_THREADS), workerFactory);
+    ThreadFactory timeoutFactory = new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, "Workload management timeout thread");
+        t.setDaemon(true);
+        return t;
+      }
+    };
+    timeoutPool = Executors.newScheduledThreadPool(1, timeoutFactory);
+
+    wmThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        runWmThread();
+      }
+    }, "Workload management master");
+    wmThread.setDaemon(true);
     // TODO: add support for per pool action handler and triggers fetcher (+atomic update to active triggers)
     sessionTriggerProvider = new SessionTriggerProvider();
     triggerActionHandler = new TriggerViolationActionHandler();
-    triggerValidatorRunnable = new TriggerValidatorRunnable(getSessionTriggerProvider(), getTriggerActionHandler());
+    triggerValidatorRunnable = new TriggerValidatorRunnable(
+        getSessionTriggerProvider(), getTriggerActionHandler());
     startTriggerValidator(conf);
   }
 
-  private int initializeHivePools(TmpResourcePlan plan) {
-    poolsLock.writeLock().lock();
-    try {
-      // FIXME: Add Triggers from metastore to poolstate
-      // Note: we assume here that plan has been validated beforehand, so we don't verify
-      //       that fractions or query parallelism add up.
-      int totalQueryParallelism = 0;
-      // Use recursion to update parents more conveniently; we don't expect a big tree.
-      for (TmpHivePool pool : plan.getRootPools()) {
-        totalQueryParallelism += addHivePool(pool, null);
-      }
-      this.userPoolMapping = new UserPoolMapping(plan.getMappings(), pools.keySet());
-      internalPoolsVersion = 0; // Initializing for the first time.
-      return totalQueryParallelism;
-    } finally {
-      poolsLock.writeLock().unlock();
+  // TODO: remove and let the thread handle it via normal ways?
+  private int applyInitialResourcePlan(TmpResourcePlan plan) {
+    int totalQueryParallelism = 0;
+    // Note: we assume here that plan has been validated beforehand, so we don't verify
+    //       that fractions or query parallelism add up.
+    this.userPoolMapping = new UserPoolMapping(plan.getMappings());
+    assert pools == null;
+    pools = new HashMap<>();
+    // Use recursion to update parents more conveniently; we don't expect a big tree.
+    for (TmpHivePool pool : plan.getRootPools()) {
+      totalQueryParallelism += addInitialHivePool(pool, null);
     }
+    return totalQueryParallelism;
   }
 
-  private final static char POOL_SEPARATOR = '/';
-  private int addHivePool(TmpHivePool pool, PoolState parent) {
+  // TODO: remove and let the thread handle it via normal ways?
+  private int addInitialHivePool(TmpHivePool pool, PoolState parent) {
     String fullName = pool.getName();
     int totalQueryParallelism = pool.getQueryParallelism();
     double fraction = pool.getResourceFraction();
@@ -211,178 +229,804 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     PoolState state = new PoolState(fullName, totalQueryParallelism, fraction);
     if (pool.getChildren() != null) {
       for (TmpHivePool child : pool.getChildren()) {
-        totalQueryParallelism += addHivePool(child, state);
+        totalQueryParallelism += addInitialHivePool(child, state);
       }
     }
-    LOG.info("Adding Hive pool: " + state);
+    state.setTriggers(pool.triggers);
+    LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
     pools.put(fullName, state);
     return totalQueryParallelism;
   }
 
-  public TezSessionState getSession(
-      TezSessionState session, String userName, HiveConf conf) throws Exception {
-    validateConfig(conf);
-    WmTezSession result = checkSessionForReuse(session);
-    boolean hasAcquired = false;
-    String poolName = null;
-    while (!hasAcquired) { // This loop handles concurrent plan updates while we are waiting.
-      poolName = userPoolMapping.mapSessionToPoolName(userName);
-      if (poolName == null) {
-        throw new HiveException("Cannot find any pool mapping for user " + userName);
-      }
-      int internalVersion = -1;
-      Semaphore sessionsClaimed = null;
-      poolsLock.readLock().lock();
+  public void start() throws Exception {
+    tezAmPool.start();
+    if (expirationTracker != null) {
+      expirationTracker.start();
+    }
+    allocationManager.start();
+    wmThread.start();
+  }
+
+  public void stop() throws Exception {
+    List<TezSessionPoolSession> sessionsToClose = null;
+    synchronized (openSessions) {
+      sessionsToClose = new ArrayList<TezSessionPoolSession>(openSessions.keySet());
+    }
+    for (TezSessionState sessionState : sessionsToClose) {
+      sessionState.close(false);
+    }
+    if (expirationTracker != null) {
+      expirationTracker.stop();
+    }
+    allocationManager.stop();
+    if (wmThread != null) {
+      wmThread.interrupt();
+    }
+    workPool.shutdownNow();
+    timeoutPool.shutdownNow();
+
+    INSTANCE = null;
+  }
+
+  /** Represent a single iteration of work for the master thread. */
+  private final static class EventState {
+    private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet(),
+        toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+    private final LinkedList<SessionInitContext> initResults = new LinkedList<>();
+    private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen =
+        new IdentityHashMap<>();
+    private final LinkedList<GetRequest> getRequests = new LinkedList<>();
+    private final IdentityHashMap<WmTezSession, GetRequest> toReuse = new IdentityHashMap<>();
+    private TmpResourcePlan resourcePlanToApply = null;
+    private boolean hasClusterStateChanged = false;
+    private SettableFuture<Boolean> testEvent, applyRpFuture;
+  }
+
+  /**
+   * The work delegated from the master thread that doesn't have an async implementation
+   * (mostly opening and closing the sessions).
+   */
+  private final static class WmThreadSyncWork {
+    private LinkedList<WmTezSession> toRestartInUse = new LinkedList<>(),
+        toDestroyNoRestart = new LinkedList<>();
+  }
+
+  private void runWmThread() {
+    while (true) {
+      EventState currentEvents = null;
+      currentLock.lock();
       try {
-        PoolState pool = pools.get(poolName);
-        if (pool == null) throw new AssertionError("Pool " + poolName + " not found.");
-        // No need to take the pool lock, semaphore is final.
-        sessionsClaimed = pool.sessionsClaimed;
-        internalVersion = internalPoolsVersion;
-      } finally {
-        poolsLock.readLock().unlock();
-      }
-      // One cannot simply reuse the session if there are other queries waiting; to maintain
-      // fairness, we'll try to take the semaphore instantly, and if that fails we'll return
-      // this session back to the pool and potentially give the user a new session later.
-      if (result != null) {
-        // Handle the special case; the pool may be exactly at capacity w/o queue. In that
-        // case, we still should be able to reuse.
-        boolean isFromTheSamePool = false;
-        String oldPoolName = result.getPoolName();
-        if (poolName.equals(oldPoolName)) {
-          sessionsClaimed.release();
-          isFromTheSamePool = true;
-        }
-        // Note: we call timed acquire because untimed one ignores fairness.
-        hasAcquired = sessionsClaimed.tryAcquire(1, TimeUnit.MILLISECONDS);
-        if (hasAcquired) {
-          poolsLock.readLock().lock();
-          boolean doUnlock = true;
+        while (!hasChanges) {
           try {
-            if (internalVersion == internalPoolsVersion) {
-              if (!isFromTheSamePool) {
-                // Free up the usage in the old pool. TODO: ideally not under lock; not critical.
-                redistributePoolAllocations(oldPoolName, null, result, true);
-              }
-              doUnlock = false; // Do not unlock; see below.
-              break;
-            }
-          } finally {
-            if (doUnlock) {
-              poolsLock.readLock().unlock();
-            }
+            hasChangesCondition.await(1, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            LOG.warn("WM thread was interrupted and will now exit");
+            return;
           }
-          hasAcquired = false;
         }
-        // Note: we are short-circuiting session::returnToSessionManager to supply the flag
-        returnAfterUse(result, !isFromTheSamePool);
-        result = null;
+        hasChanges = false;
+        currentEvents = current;
+        current = (currentEvents == one) ? two : one;
+      } finally {
+        currentLock.unlock();
       }
-      // We don't expect frequent updates, so check every second.
-      while (!(hasAcquired = (hasAcquired || sessionsClaimed.tryAcquire(1, TimeUnit.SECONDS)))) {
-        poolsLock.readLock().lock();
-        try {
-          if (internalVersion != internalPoolsVersion) break;
-        } finally {
-          poolsLock.readLock().unlock();
+      try {
+        LOG.debug("Processing current events");
+        processCurrentEvents(currentEvents, syncWork);
+        scheduleWork(syncWork);
+      } catch (InterruptedException ex) {
+        LOG.warn("WM thread was interrupted and will now exit");
+        return;
+      } catch (Exception | AssertionError ex) {
+        LOG.error("WM thread encountered an error but will attempt to continue", ex);
+        if (currentEvents.testEvent != null) {
+          currentEvents.testEvent.setException(ex);
+          currentEvents.testEvent = null;
         }
+        if (currentEvents.applyRpFuture != null) {
+          currentEvents.applyRpFuture.setException(ex);
+          currentEvents.applyRpFuture = null;
+        }
+        // TODO: we either have to kill HS2 or, as the non-actor model would implicitly,
+        //       hope for the best and continue on other threads. Do the latter for now.
+        continue;
       }
-      if (!hasAcquired) continue;
-      // Keep it simple for now - everything between acquiring the semaphore and adding the session
-      // to the pool state is done under read lock, blocking pool updates. It's possible to make
-      // it more granular if needed. The only potentially lengthy operation is waiting for an
-      // expired session to be restarted in the session pool.
-      poolsLock.readLock().lock();
-      if (internalVersion == internalPoolsVersion) break;
-      poolsLock.readLock().unlock();
-      hasAcquired = false;
-    }
-    // We are holding the lock from the end of the loop.
-    try {
-      assert hasAcquired;
-      while (true) {
-        // TODO: ideally, we'd need to implement tryGet and deal with the valid wait from a session
-        //       restarting somehow, as opposed to the invalid case of a session missing from the
-        //       pool due to some bug. Keep a "restarting" counter in the pool?
-        boolean isFromTheSamePool = false;
-        if (result == null) {
-          result = sessions.getSession();
-        } else {
-          // If we are just reusing the session from the same pool, do not adjust allocations.
-          isFromTheSamePool = poolName.equals(result.getPoolName());
+    }
+  }
+
+  private void scheduleWork(WmThreadSyncWork context) {
+    // Do the work that cannot be done via async calls.
+
+    // 1. Restart pool sessions.
+    for (final WmTezSession toRestart : context.toRestartInUse) {
+      LOG.debug("Replacing " + toRestart + " with a new session");
+      workPool.submit(() -> {
+        try {
+          // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
+          tezAmPool.replaceSession(toRestart, false, null);
+        } catch (Exception ex) {
+          LOG.error("Failed to restart an old session; ignoring " + ex.getMessage());
         }
-        result.setQueueName(yarnQueue);
-        result.setPoolName(poolName);
-        if (!ensureAmIsRegistered(result)) continue; // Try another.
-        if (!isFromTheSamePool) {
-          redistributePoolAllocations(poolName, result, null, false);
+      });
+    }
+    context.toRestartInUse.clear();
+    // 2. Destroy the sessions that we don't need anymore.
+    for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
+      LOG.debug("Closing " + toDestroy + " without restart");
+      workPool.submit(() -> {
+        try {
+          toDestroy.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an old session; ignoring " + ex.getMessage());
         }
-        return result;
+      });
+    }
+    context.toDestroyNoRestart.clear();
+  }
+
+  private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
+    // The order of processing is as follows. We'd reclaim or kill all the sessions that we can
+    // reclaim from various user actions and errors, then apply the new plan if any,
+    // then give out all we can give out (restart, get and reopen callers) and rebalance the
+    // resource allocations in all the affected pools.
+    // For every session, we'd check all the concurrent things happening to it.
+
+    // TODO: also account for Tez-internal session restarts;
+    //       AM reg info changes; add notifications, ignore errors, and update alloc.
+    HashSet<String> poolsToRedistribute = new HashSet<>();
+
+    // 0. Handle initialization results.
+    for (SessionInitContext sw : e.initResults) {
+      handleInitResultOnMasterThread(sw, syncWork, poolsToRedistribute);
+    }
+    e.initResults.clear();
+
+    // 1. Handle sessions that are being destroyed by users. Destroy implies return.
+    for (WmTezSession sessionToDestroy : e.toDestroy) {
+      if (e.toReturn.remove(sessionToDestroy)) {
+        LOG.warn("The session was both destroyed and returned by the user; destroying");
       }
-    } finally {
-      poolsLock.readLock().unlock();
+      LOG.debug("Destroying {}", sessionToDestroy);
+      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToDestroy, poolsToRedistribute);
+      if (shouldReturn == null || shouldReturn) {
+        // Restart if this session is still relevant, even if there's an internal error.
+        syncWork.toRestartInUse.add(sessionToDestroy);
+      }
+    }
+    e.toDestroy.clear();
+
+    // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
+    for (WmTezSession sessionToReturn: e.toReturn) {
+      LOG.debug("Returning {}", sessionToReturn);
+      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToReturn, poolsToRedistribute);
+      if (shouldReturn == null) {
+        // Restart if there's an internal error.
+        syncWork.toRestartInUse.add(sessionToReturn);
+        continue;
+      }
+      if (!shouldReturn) continue;
+      boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
+      if (!wasReturned) {
+        syncWork.toDestroyNoRestart.add(sessionToReturn);
+      }
+    }
+    e.toReturn.clear();
+
+    // 3. Reopen is essentially just destroy + get a new session for a session in use.
+    for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
+      LOG.debug("Reopening {}", entry.getKey());
+      handeReopenRequestOnMasterThread(
+          e, entry.getKey(), entry.getValue(), poolsToRedistribute, syncWork);
+    }
+    e.toReopen.clear();
+
+    // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
+    for (WmTezSession sessionWithUpdateError : e.updateErrors) {
+      LOG.debug("Update failed for {}", sessionWithUpdateError);
+      handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
+    }
+    e.updateErrors.clear();
+
+    // 5. Now apply a resource plan if any. This is expected to be pretty rare.
+    boolean hasRequeues = false;
+    if (e.resourcePlanToApply != null) {
+      LOG.debug("Applying new resource plan");
+      int getReqCount = e.getRequests.size();
+      applyNewResourcePlanOnMasterThread(e, syncWork, poolsToRedistribute);
+      hasRequeues = getReqCount != e.getRequests.size();
+    }
+    e.resourcePlanToApply = null;
+
+    // 6. Handle all the get/reuse requests. We won't actually give out anything here, but merely
+    //    map all the requests and place them in an appropriate order in pool queues. The only
+    //    exception is the reuse without queue contention; can be granted immediately. If we can't
+    //    reuse the session immediately, we will convert the reuse to a normal get, because we
+    //    want query level fairness, and don't want the get in queue to hold up a session.
+    GetRequest req;
+    while ((req = e.getRequests.pollFirst()) != null) {
+      LOG.debug("Processing a new get request from " + req.userName);
+      queueGetRequestOnMasterThread(req, poolsToRedistribute, syncWork);
+    }
+    e.toReuse.clear();
+
+    // 7. If there was a cluster state change, make sure we redistribute all the pools.
+    if (e.hasClusterStateChanged) {
+      LOG.debug("Processing a cluster state change");
+      poolsToRedistribute.addAll(pools.keySet());
+      e.hasClusterStateChanged = false;
+    }
+
+    // 8. Finally, for all the pools that have changes, promote queued queries and rebalance.
+    for (String poolName : poolsToRedistribute) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Processing changes for pool " + poolName + ": " + pools.get(poolName));
+      }
+      processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
+    }
+
+    // 9. Notify tests and global async ops.
+    if (e.testEvent != null) {
+      e.testEvent.set(true);
+      e.testEvent = null;
+    }
+    if (e.applyRpFuture != null) {
+      e.applyRpFuture.set(true);
+      e.applyRpFuture = null;
     }
   }
 
-  @VisibleForTesting
-  protected boolean ensureAmIsRegistered(WmTezSession session) throws Exception {
-    // Make sure AM is ready to use and registered with AM registry.
+  // ========= Master thread methods
+
+  private void handleInitResultOnMasterThread(
+      SessionInitContext sw, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    // For the failures, the users have been notified, we just need to clean up. There's no
+    // session here (or it's unused), so no conflicts are possible. We just remove it.
+    // For successes, the user has also been notified, so various requests are also possible;
+    // however, to start, we'd just put the session into the sessions list and go from there.
+    WmTezSession session = null;
+    sw.lock.lock();
     try {
-      session.waitForAmPluginInfo(amRegistryTimeoutMs);
-    } catch (TimeoutException ex) {
-      LOG.error("Timed out waiting for AM registry information for " + session.getSessionId());
-      session.destroy();
+      if (sw.state == SessionInitState.CANCELED) {
+        // We have processed this on the previous run, after it has already queued the message.
+        return;
+      }
+      assert sw.state == SessionInitState.DONE;
+      session = sw.session;
+      sw.session = null;
+    } finally {
+      sw.lock.unlock();
+    }
+    LOG.debug("Processing " + ((session == null) ? "failed" : "successful")
+        + " initialization result for pool " + sw.poolName);
+    // We could not have removed the pool for this session, or we would have CANCELED the init.
+    PoolState pool = pools.get(sw.poolName);
+    if (pool == null || !pool.initializingSessions.remove(sw)) {
+      // Query parallelism might be fubar.
+      LOG.error("Cannot remove initializing session from the pool "
+          + sw.poolName + " - internal error");
+    }
+    poolsToRedistribute.add(sw.poolName);
+    if (session != null) {
+      if (pool != null) {
+        pool.sessions.add(session);
+      } else {
+        LOG.error("Cannot add new session to the pool "
+          + sw.poolName + " because it was removed unexpectedly - internal error " + session);
+        syncWork.toRestartInUse.add(session);
+      }
+    }
+  }
+
+  private Boolean handleReturnedInUseSessionOnMasterThread(
+      EventState e, WmTezSession session, HashSet<String> poolsToRedistribute) {
+    // This handles the common logic for destroy and return - everything except
+    // the invalid combination of destroy and return themselves, as well as the actual
+    // statement that destroys or returns it.
+    if (e.updateErrors.remove(session)) {
+      LOG.debug("Ignoring an update error for a session being destroyed or returned");
+    }
+    SettableFuture<WmTezSession> future = e.toReopen.remove(session);
+    if (future != null) {
+      future.setException(new AssertionError("Invalid reopen attempt"));
+    }
+    GetRequest reuseRequest = e.toReuse.remove(session);
+    if (reuseRequest != null) {
+      reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+    }
+    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+  }
+
+  private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
+      SettableFuture<WmTezSession> future, HashSet<String> poolsToRedistribute,
+      WmThreadSyncWork syncWork) throws Exception {
+    if (e.updateErrors.remove(session)) {
+      LOG.debug("Ignoring an update error for a session being reopened");
+    }
+    GetRequest reuseRequest = e.toReuse.remove(session);
+    if (reuseRequest != null) {
+      reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
+    }
+    // In order to expedite things in a general case, we are not actually going to reopen
+    // anything. Instead, we will try to give out an existing session from the pool, and restart
+    // the problematic one in background.
+    String poolName = session.getPoolName();
+    Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+    // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
+    // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
+    if (isRemoved == null) {
+      future.setException(new RuntimeException("Reopen failed due to an internal error"));
+      syncWork.toRestartInUse.add(session);
+      return;
+    } else if (!isRemoved) {
+      future.setException(new RuntimeException("WM killed this session during reopen: "
+          + session.getReasonForKill()));
+      return; // No longer relevant for WM - bail.
+    }
+    // If pool didn't exist, removeSessionFromItsPool would have returned null.
+    PoolState pool = pools.get(poolName);
+    SessionInitContext sw = new SessionInitContext(future, poolName);
+    // We have just removed the session from the same pool, so don't check concurrency here.
+    pool.initializingSessions.add(sw);
+    ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+    Futures.addCallback(getFuture, sw);
+    syncWork.toRestartInUse.add(session);
+  }
+
+  private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError,
+      EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    GetRequest reuseRequest = e.toReuse.remove(sessionWithUpdateError);
+    if (reuseRequest != null) {
+      // This session is bad, so don't allow reuse; just convert it to normal get.
+      reuseRequest.sessionToReuse = null;
+    }
+    // TODO: we should communicate this to the user more explicitly (use kill query API, or
+    //       add an option for bg kill checking to TezTask/monitor?
+    // We are assuming the update-error AM is bad and just try to kill it.
+    Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute);
+    if (isRemoved != null && !isRemoved) {
+      // An update error for some session that was actually already killed by us.
+      return;
+    }
+    // Regardless whether it was removed successfully or after failing to remove, restart it.
+    // Since we just restart this from under the user, mark it so we handle it properly when
+    // the user tries to actually use this session and fails, proceeding to return/destroy it.
+    // TODO: propagate this error to TezJobMonitor somehow, after we add the use of KillQuery.
+    sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
+    syncWork.toRestartInUse.add(sessionWithUpdateError);
+  }
+
+  private void applyNewResourcePlanOnMasterThread(
+      EventState e, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    int totalQueryParallelism = 0;
+    // FIXME: Add Triggers from metastore to poolstate
+    // Note: we assume here that plan has been validated beforehand, so we don't verify
+    //       that fractions or query parallelism add up, etc.
+    this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings());
+    HashMap<String, PoolState> oldPools = pools;
+    pools = new HashMap<>();
+    // Use recursion to update parents more conveniently; we don't expect a big tree.
+    for (TmpHivePool pool : e.resourcePlanToApply.getRootPools()) {
+      totalQueryParallelism += addHivePool(
+          pool, null, oldPools, syncWork.toRestartInUse, poolsToRedistribute, e);
+    }
+    if (oldPools != null && !oldPools.isEmpty()) {
+      // Looks like some pools were removed; insert queued queries into the front of get reqs.
+      for (PoolState oldPool : oldPools.values()) {
+        oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
+      }
+    }
+
+    LOG.info("Updating with " + totalQueryParallelism + " total query parallelism");
+    int deltaSessions = totalQueryParallelism - this.totalQueryParallelism;
+    this.totalQueryParallelism = totalQueryParallelism;
+    if (deltaSessions == 0) return; // Nothing to do.
+    if (deltaSessions < 0) {
+      // First, see if we have unused sessions that we were planning to restart; get rid of those.
+      int toTransfer = Math.min(-deltaSessions, syncWork.toRestartInUse.size());
+      for (int i = 0; i < toTransfer; ++i) {
+        syncWork.toDestroyNoRestart.add(syncWork.toRestartInUse.pollFirst());
+      }
+      deltaSessions += toTransfer;
+    }
+    if (deltaSessions != 0) {
+      failOnFutureFailure(tezAmPool.resizeAsync(
+          deltaSessions, syncWork.toDestroyNoRestart));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void failOnFutureFailure(ListenableFuture<?> future) {
+    Futures.addCallback(future, FATAL_ERROR_CALLBACK);
+  }
+
+  private void queueGetRequestOnMasterThread(
+      GetRequest req, HashSet<String> poolsToRedistribute, WmThreadSyncWork syncWork) {
+    String poolName = userPoolMapping.mapSessionToPoolName(req.userName);
+    if (poolName == null) {
+      req.future.setException(new HiveException(
+          "Cannot find any pool mapping for user " + req.userName));
+      returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+      return;
+    }
+    PoolState pool = pools.get(poolName);
+    if (pool == null) {
+      req.future.setException(new AssertionError(poolName + " not found (internal error)."));
+      returnSessionOnFailedReuse(req, syncWork, poolsToRedistribute);
+      return;
+    }
+
+    PoolState oldPool = null;
+    if (req.sessionToReuse != null) {
+      // Given that we are trying to reuse, this session MUST be in some pool.sessions.
+      // Kills that could have removed it must have cleared sessionToReuse.
+      String oldPoolName = req.sessionToReuse.getPoolName();
+      oldPool = pools.get(oldPoolName);
+      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+      if (isRemoved == null || !isRemoved) {
+        // This is probably an internal error... abandon the reuse attempt.
+        returnSessionOnFailedReuse(req, syncWork, null);
+        req.sessionToReuse = null;
+      } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
+        // One cannot simply reuse the session if there are other queries waiting; to maintain
+        // fairness, we'll try to take a query slot instantly, and if that fails we'll return
+        // this session back to the pool and give the user a new session later.
+        returnSessionOnFailedReuse(req, syncWork, null);
+        req.sessionToReuse = null;
+      }
+    }
+
+    if (req.sessionToReuse != null) {
+      // If we can immediately reuse a session, there's nothing to wait for - just return.
+      req.sessionToReuse.setPoolName(poolName);
+      req.sessionToReuse.setQueueName(yarnQueue);
+      pool.sessions.add(req.sessionToReuse);
+      if (pool != oldPool) {
+        poolsToRedistribute.add(poolName);
+      }
+      req.future.set(req.sessionToReuse);
+      return;
+    }
+    // Otherwise, queue the session and make sure we update this pool.
+    pool.queue.addLast(req);
+    poolsToRedistribute.add(poolName);
+  }
+
+
+  private void processPoolChangesOnMasterThread(
+      String poolName, WmThreadSyncWork context, boolean hasRequeues) throws Exception {
+    PoolState pool = pools.get(poolName);
+    if (pool == null) return; // Might be from before the new resource plan.
+
+    // 1. First, start the queries from the queue.
+    int queriesToStart = Math.min(pool.queue.size(),
+        pool.queryParallelism - pool.getTotalActiveSessions());
+    if (queriesToStart > 0) {
+      LOG.debug("Starting {} queries in pool {}", queriesToStart, pool);
+    }
+    if (hasRequeues) {
+      // Sort the queue - we may have put items here out of order.
+      Collections.sort(pool.queue, GetRequest.ORDER_COMPARATOR);
+    }
+    for (int i = 0; i < queriesToStart; ++i) {
+      GetRequest queueReq = pool.queue.pollFirst();
+      assert queueReq.sessionToReuse == null;
+      // Note that in theory, we are guaranteed to have a session waiting for us here, but
+      // the expiration, failures, etc. may cause one to be missing pending restart.
+      // See SessionInitContext javadoc.
+      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName);
+      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+      Futures.addCallback(getFuture, sw);
+      // It is possible that all the async methods returned on the same thread because the
+      // session with registry data and stuff was available in the pool.
+      // If this happens, we'll take the session out here and "cancel" the init so we skip
+      // processing the message that the successful init has queued for us.
+      boolean isDone = sw.extractSessionAndCancelIfDone(pool.sessions);
+      if (!isDone) {
+        pool.initializingSessions.add(sw);
+      }
+      // The user has already been notified of completion by SessionInitContext.
+    }
+
+    // 2. Then, update pool allocations.
+    double totalAlloc = pool.updateAllocationPercentages();
+    // We are calling this here because we expect the method to be completely async. We also don't
+    // want this call itself to go on a thread because we want the percent-to-physics conversion
+    // logic to be consistent between all the separate calls in one master thread processing round.
+    // Note: If allocation manager does not have cluster state, it won't update anything. When the
+    //       cluster state changes, it will notify us, and we'd update the queries again.
+    allocationManager.updateSessionsAsync(totalAlloc, pool.sessions);
+
+    // 3. Update triggers for this pool.
+    //    TODO: need to merge with per-pool enforcement, it will only work for one pool for now.
+    if (sessionTriggerProvider != null) {
+      sessionTriggerProvider.setOpenSessions(
+          Collections.<TezSessionState>unmodifiableList(pool.sessions));
+      sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(pool.triggers));
+    }
+  }
+
+  private void returnSessionOnFailedReuse(
+      GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    if (req.sessionToReuse == null) return;
+    if (poolsToRedistribute != null) {
+      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
+      // The session cannot have been killed; this happens after all the kills in the current
+      // iteration, so we would have cleared sessionToReuse when killing this.
+      assert isRemoved == null || isRemoved;
+    }
+    if (!tezAmPool.returnSessionAsync(req.sessionToReuse)) {
+      syncWork.toDestroyNoRestart.add(req.sessionToReuse);
+    }
+    req.sessionToReuse = null;
+  }
+
+  private int addHivePool(TmpHivePool pool, PoolState parent,
+      HashMap<String, PoolState> oldPools, List<WmTezSession> toKill,
+      HashSet<String> poolsToRedistribute, EventState e) {
+    String fullName = pool.getName();
+    int totalQueryParallelism = pool.getQueryParallelism();
+    double fraction = pool.getResourceFraction();
+    if (parent != null) {
+      fullName = parent.fullName + POOL_SEPARATOR + fullName;
+      fraction = parent.finalFraction * pool.getResourceFraction();
+      parent.finalFractionRemaining -= fraction;
+    }
+    PoolState state = oldPools == null ? null : oldPools.remove(fullName);
+    if (state == null) {
+      state = new PoolState(fullName, totalQueryParallelism, fraction);
+    } else {
+      // This will also take care of the queries if query parallelism changed.
+      state.update(totalQueryParallelism, fraction, toKill, e);
+      poolsToRedistribute.add(fullName);
+    }
+    state.setTriggers(pool.triggers);
+
+    if (pool.getChildren() != null) {
+      for (TmpHivePool child : pool.getChildren()) {
+        totalQueryParallelism += addHivePool(
+            child, state, oldPools, toKill, poolsToRedistribute, e);
+      }
+    }
+    LOG.info("Adding Hive pool: " + state + " with triggers " + pool.triggers);
+    pools.put(fullName, state);
+    return totalQueryParallelism;
+  }
+
+
+  /**
+   * Checks if the session is still relevant for WM and if yes, removes it from its thread.
+   * @return true if the session was removed; false if the session was already processed by WM
+   *         thread (so we are dealing with an outdated request); null if the session should be
+   *         in WM but wasn't found in the requisite pool (internal error?).
+   */
+  private Boolean checkAndRemoveSessionFromItsPool(
+      WmTezSession session, HashSet<String> poolsToRedistribute) {
+    // It is possible for some request to be queued after a main thread has decided to kill this
+    // session; on the next iteration, we'd be processing that request with an irrelevant session.
+    if (session.isIrrelevantForWm()) {
       return false;
     }
-    return true;
+    // If we did not kill this session we expect everything to be present.
+    String poolName = session.getPoolName();
+    session.clearWm();
+    if (poolName != null) {
+      poolsToRedistribute.add(poolName);
+      PoolState pool = pools.get(poolName);
+      if (pool != null && pool.sessions.remove(session)) return true;
+    }
+    LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
+    return null;
   }
 
-  private void redistributePoolAllocations(
-      String poolName, WmTezSession sessionToAdd, WmTezSession sessionToRemove,
-      boolean releaseParallelism) {
-    List<WmTezSession> sessionsToUpdate = null;
-    double totalAlloc = 0;
-    assert sessionToAdd == null || poolName.equals(sessionToAdd.getPoolName());
-    assert sessionToRemove == null || poolName.equals(sessionToRemove.getPoolName());
-    poolsLock.readLock().lock();
-    boolean hasRemoveFailed = false;
+  // ===== EVENT METHODS
+
+  public Future<Boolean> updateResourcePlanAsync(TmpResourcePlan plan) {
+    SettableFuture<Boolean> applyRpFuture = SettableFuture.create();
+    currentLock.lock();
     try {
-      PoolState pool = pools.get(poolName);
-      synchronized (pool.lock) {
-        // This should be a 2nd order fn but it's too much pain in Java for one LOC.
-        if (sessionToAdd != null) {
-          pool.sessions.add(sessionToAdd);
-        }
-        if (sessionToRemove != null) {
-          // TODO: this assumes that the update process will take the write lock, and make
-          //       everything right w.r.t. semaphores, pool names and other stuff, since we might
-          //       be releasing a different semaphore from the one we acquired if it's across
-          //       the update. If the magic in the update is weak, this may become more involved.
-          if (!pool.sessions.remove(sessionToRemove)) {
-            LOG.error("Session " + sessionToRemove + " could not be removed from the pool");
-            if (releaseParallelism) {
-              hasRemoveFailed = true;
-            }
-          } else if (releaseParallelism) {
-            pool.sessionsClaimed.release();
-          }
-          sessionToRemove.setClusterFraction(0);
-        }
-        totalAlloc = updatePoolAllocations(pool.sessions, pool.finalFractionRemaining);
-        sessionsToUpdate = new ArrayList<>(pool.sessions);
+      // TODO: if there's versioning/etc., it will come in here. For now we rely on external
+      //       locking or ordering of calls. This should potentially return a Future for that.
+      if (current.resourcePlanToApply != null) {
+        LOG.warn("Several resource plans are being applied at the same time; using the latest");
+        current.applyRpFuture.setException(
+            new HiveException("Another plan was applied in parallel"));
+      }
+      current.resourcePlanToApply = plan;
+      current.applyRpFuture = applyRpFuture;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return applyRpFuture;
+  }
+
+  private final static class GetRequest {
+    public static final Comparator<GetRequest> ORDER_COMPARATOR = new Comparator<GetRequest>() {
+      @Override
+      public int compare(GetRequest o1, GetRequest o2) {
+        if (o1.order == o2.order) return 0;
+        return o1.order < o2.order ? -1 : 1;
+      }
+    };
+    private final long order;
+    private final String userName;
+    private final SettableFuture<WmTezSession> future;
+    private WmTezSession sessionToReuse;
+
+    private GetRequest(String userName, SettableFuture<WmTezSession> future,
+        WmTezSession sessionToReuse, long order) {
+      this.userName = userName;
+      this.future = future;
+      this.sessionToReuse = sessionToReuse;
+      this.order = order;
+    }
+
+    @Override
+    public String toString() {
+      return "[#" + order + ", " + userName + ", reuse " + sessionToReuse + "]";
+    }
+  }
+
+  public TezSessionState getSession(
+      TezSessionState session, String userName, HiveConf conf) throws Exception {
+    // Note: not actually used for pool sessions; verify some things like doAs are not set.
+    validateConfig(conf);
+    SettableFuture<WmTezSession> future = SettableFuture.create();
+    WmTezSession wmSession = checkSessionForReuse(session);
+    GetRequest req = new GetRequest(
+        userName, future, wmSession, getRequestVersion.incrementAndGet());
+    currentLock.lock();
+    try {
+      current.getRequests.add(req);
+      if (req.sessionToReuse != null) {
+        // Note: we assume reuse is only possible for the same user and config.
+        current.toReuse.put(wmSession, req);
       }
+      notifyWmThreadUnderLock();
     } finally {
-      poolsLock.readLock().unlock();
+      currentLock.unlock();
     }
-    allocationManager.updateSessionsAsync(totalAlloc, sessionsToUpdate);
-    updateSessionsTriggers();
-    if (hasRemoveFailed) {
-      throw new AssertionError("Cannot remove the session from the pool and release "
-          + "the query slot; HS2 may fail to accept queries");
+    return future.get();
+  }
+
+  @Override
+  public void destroy(TezSessionState session) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    resetGlobalTezSession(wmTezSession);
+    currentLock.lock();
+    try {
+      current.toDestroy.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
     }
   }
 
+  private void resetGlobalTezSession(WmTezSession wmTezSession) {
+    // This has to be done synchronously to avoid the caller getting this session again.
+    // Ideally we'd get rid of this thread-local nonsense.
+    SessionState sessionState = SessionState.get();
+    if (sessionState != null && sessionState.getTezSession() == wmTezSession) {
+      sessionState.setTezSession(null);
+    }
+  }
+
+  @Override
+  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    resetGlobalTezSession(wmTezSession);
+    currentLock.lock();
+    try {
+      current.toReturn.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  // TODO: use this
+  public void nofityOfClusterStateChange() {
+    currentLock.lock();
+    try {
+      current.hasClusterStateChanged = true;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  public void addUpdateError(WmTezSession wmTezSession) {
+    currentLock.lock();
+    try {
+      current.updateErrors.add(wmTezSession);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  /**
+   * Adds a test event that's processed at the end of WM iteration.
+   * This allows tests to wait for an iteration to finish without messing with the threading
+   * logic (that is prone to races if we e.g. remember the state before and wait for it to change,
+   * self-deadlocking when triggering things explicitly and calling a blocking API, and hanging
+   * forever if we wait for "another iteration"). If addTestEvent is called after all the other
+   * calls of interest, it is guaranteed that the events from those calls will be processed
+   * fully when the future is triggered.
+   */
+  Future<Boolean> addTestEvent() {
+    SettableFuture<Boolean> testEvent = SettableFuture.create();
+    currentLock.lock();
+    try {
+      current.testEvent = testEvent;
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return testEvent;
+  }
+
+  public void notifyInitializationCompleted(SessionInitContext initCtx) {
+    currentLock.lock();
+    try {
+      current.initResults.add(initCtx);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+
+  @Override
+  public TezSessionState reopen(TezSessionState session, Configuration conf,
+      String[] additionalFiles) throws Exception {
+    WmTezSession wmTezSession = ensureOwnedSession(session);
+    HiveConf sessionConf = wmTezSession.getConf();
+    if (sessionConf == null) {
+      LOG.warn("Session configuration is null for " + wmTezSession);
+      sessionConf = new HiveConf(conf, WorkloadManager.class);
+    }
+    // TODO: ideally, we should handle reopen the same way no matter what. However, the cases
+    //       with additional files will have to wait until HIVE-17827 is unfucked, because it's
+    //       difficult to determine how the additionalFiles are to be propagated/reused between
+    //       two sessions. Once the update logic is encapsulated in the session we can remove this.
+    if (additionalFiles != null && additionalFiles.length > 0) {
+      TezSessionPoolManager.reopenInternal(session, additionalFiles);
+      return session;
+    }
+
+    SettableFuture<WmTezSession> future = SettableFuture.create();
+    currentLock.lock();
+    try {
+      if (current.toReopen.containsKey(wmTezSession)) {
+        throw new AssertionError("The session is being reopened more than once " + session);
+      }
+      current.toReopen.put(wmTezSession, future);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+    return future.get();
+  }
+
+  @Override
+  public void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception {
+    // By definition, this session is not in use and can no longer be in use, so it only
+    // affects the session pool. We can handle this inline.
+    tezAmPool.replaceSession(ensureOwnedSession(session), false, null);
+  }
+
+  // ======= VARIOUS UTILITY METHOD
+
+  private void notifyWmThreadUnderLock() {
+    if (hasChanges) return;
+    hasChanges = true;
+    hasChangesCondition.signalAll();
+  }
+
   private WmTezSession checkSessionForReuse(TezSessionState session) throws Exception {
     if (session == null) return null;
     WmTezSession result = null;
@@ -391,7 +1035,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       if (result.isOwnedBy(this)) {
         return result;
       }
-      // TODO: this should never happen, at least for now. Throw?
+      // This should never happen, at least for now. Throw?
       LOG.warn("Attempting to reuse a session not belonging to us: " + result);
       result.returnToSessionManager();
       return null;
@@ -405,15 +1049,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return null;
   }
 
-  private double updatePoolAllocations(List<WmTezSession> sessions, double totalFraction) {
-    // TODO: real implementation involving in-the-pool policy interface, etc.
-    double allocation = totalFraction / sessions.size();
-    for (WmTezSession session : sessions) {
-      session.setClusterFraction(allocation);
-    }
-    return totalFraction;
-  }
-
   private void validateConfig(HiveConf conf) throws HiveException {
     String queueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
     if ((queueName != null) && !queueName.isEmpty()) {
@@ -429,69 +1064,18 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  public void start() throws Exception {
-    sessions.startInitialSessions();
-    if (expirationTracker != null) {
-      expirationTracker.start();
-    }
-    allocationManager.start();
-  }
-
-  public void stop() throws Exception {
-    List<TezSessionPoolSession> sessionsToClose = null;
-    synchronized (openSessions) {
-      sessionsToClose = new ArrayList<>(openSessions.keySet());
-    }
-
-    for (TezSessionPoolSession sessionState : sessionsToClose) {
-      sessionState.close(false);
-    }
-
-    if (expirationTracker != null) {
-      expirationTracker.stop();
-    }
-    allocationManager.stop();
-
-    INSTANCE = null;
-  }
-
-  private WmTezSession createSession() {
-    WmTezSession session = createSessionObject(TezSessionState.makeSessionId());
+  private WmTezSession createSession(HiveConf conf) {
+    WmTezSession session = createSessionObject(TezSessionState.makeSessionId(), conf);
     session.setQueueName(yarnQueue);
     session.setDefault();
-    LOG.info("Created new interactive session " + session.getSessionId());
+    LOG.info("Created new interactive session object " + session.getSessionId());
     return session;
   }
 
   @VisibleForTesting
-  protected WmTezSession createSessionObject(String sessionId) {
-    return new WmTezSession(sessionId, this, expirationTracker, new HiveConf(conf));
-  }
-
-  @Override
-  public void returnAfterUse(TezSessionPoolSession session) throws Exception {
-    returnAfterUse(session, true);
-  }
-
-  private void returnAfterUse(
-      TezSessionPoolSession session, boolean releaseParallelism) throws Exception {
-    boolean isInterrupted = Thread.interrupted();
-    try {
-      WmTezSession wmSession = ensureOwnedSession(session);
-      redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, releaseParallelism);
-      sessions.returnSession((WmTezSession) session);
-    } finally {
-      // Reset the interrupt status.
-      if (isInterrupted) {
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  /** Closes a running (expired) pool session and reopens it. */
-  @Override
-  public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
-    sessions.replaceSession(ensureOwnedSession(oldSession), createSession(), false, null, null);
+  protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
+    conf = (conf == null) ? new HiveConf(this.conf) : conf;
+    return new WmTezSession(sessionId, this, expirationTracker, conf);
   }
 
   private WmTezSession ensureOwnedSession(TezSessionState oldSession) {
@@ -506,9 +1090,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   @Override
   public void registerOpenSession(TezSessionPoolSession session) {
     synchronized (openSessions) {
-      openSessions.put(session, null);
+      openSessions.put(session, true);
     }
-    updateSessionsTriggers();
   }
 
   /** Called by TezSessionPoolSession when closed. */
@@ -517,20 +1100,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     synchronized (openSessions) {
       openSessions.remove(session);
     }
-    updateSessionsTriggers();
-  }
-
-  private void updateSessionsTriggers() {
-    if (sessionTriggerProvider != null) {
-      List<TezSessionState> openSessions = new ArrayList<>();
-      List<Trigger> activeTriggers = new ArrayList<>();
-      for (PoolState poolState : pools.values()) {
-        activeTriggers.addAll(poolState.getTriggers());
-        openSessions.addAll(poolState.sessions);
-      }
-      sessionTriggerProvider.setOpenSessions(Collections.unmodifiableList(openSessions));
-      sessionTriggerProvider.setActiveTriggers(Collections.unmodifiableList(activeTriggers));
-    }
   }
 
   @VisibleForTesting
@@ -538,41 +1107,25 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return expirationTracker;
   }
 
-  @Override
-  public TezSessionState reopen(TezSessionState session, Configuration conf,
-      String[] additionalFiles) throws Exception {
-    WmTezSession oldSession = ensureOwnedSession(session), newSession = createSession();
-    newSession.setPoolName(oldSession.getPoolName());
-    HiveConf sessionConf = session.getConf();
-    if (sessionConf == null) {
-      LOG.warn("Session configuration is null for " + session);
-      // default queue name when the initial session was created
-      sessionConf = new HiveConf(conf, WorkloadManager.class);
-    }
-    sessions.replaceSession(oldSession, newSession, true, additionalFiles, sessionConf);
-    // We are going to immediately give this session out, so ensure AM registry.
-    if (!ensureAmIsRegistered(newSession)) {
-      throw new Exception("Session is not usable after reopen");
-    }
-    // Do not release the parallelism - we are just replacing the session in the same pool.
-    redistributePoolAllocations(oldSession.getPoolName(), newSession, oldSession, false);
-    return newSession;
+  @VisibleForTesting
+  int getNumSessions() {
+    return tezAmPool.getInitialSize();
   }
 
-  @Override
-  public void destroy(TezSessionState session) throws Exception {
-    LOG.warn("Closing a pool session because of retry failure.");
-    // We never want to lose pool sessions. Replace it instead; al trigger duck redistribution.
-    WmTezSession wmSession = ensureOwnedSession(session);
-    closeAndReopenPoolSession(wmSession);
-    redistributePoolAllocations(wmSession.getPoolName(), null, wmSession, true);
+  protected final HiveConf getConf() {
+    return conf;
   }
 
-  @VisibleForTesting
-  int getNumSessions() {
-    return sessions.getInitialSize();
+  public List<String> getTriggerCounterNames() {
+    List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
+    List<String> counterNames = new ArrayList<>();
+    for (Trigger trigger : activeTriggers) {
+      counterNames.add(trigger.getExpression().getCounterLimit().getName());
+    }
+    return counterNames;
   }
 
+
   @Override
   SessionTriggerProvider getSessionTriggerProvider() {
     return sessionTriggerProvider;
@@ -588,38 +1141,319 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return triggerValidatorRunnable;
   }
 
-  @VisibleForTesting
-  public Map<String, PoolState> getPools() {
-    return pools;
+  /**
+   * State of a single pool.
+   * Unless otherwise specified, the members are only modified by the master thread.
+   */
+  private static class PoolState {
+    // Add stuff here as WM is implemented.
+    private final LinkedList<SessionInitContext> initializingSessions = new LinkedList<>();
+    // Note: the list is expected to be a few items; if it's longer we may want an IHM.
+    private final LinkedList<WmTezSession> sessions = new LinkedList<>();
+    private final LinkedList<GetRequest> queue = new LinkedList<>();
+
+    private final String fullName;
+    private double finalFraction;
+    private double finalFractionRemaining;
+    private int queryParallelism = -1;
+    private List<Trigger> triggers = new ArrayList<>();
+
+    public PoolState(String fullName, int queryParallelism, double fraction) {
+      this.fullName = fullName;
+      update(queryParallelism, fraction, null, null);
+    }
+
+    public int getTotalActiveSessions() {
+      return sessions.size() + initializingSessions.size();
+    }
+
+    public void update(int queryParallelism, double fraction,
+        List<WmTezSession> toKill, EventState e) {
+      this.finalFraction = this.finalFractionRemaining = fraction;
+      this.queryParallelism = queryParallelism;
+      // TODO: two possible improvements
+      //       1) Right now we kill all the queries here; we could just kill -qpDelta.
+      //       2) After the queries are killed queued queries would take their place.
+      //          If we could somehow restart queries we could instead put them at the front
+      //          of the queue (esp. in conjunction with (1)) and rerun them.
+      if (queryParallelism < getTotalActiveSessions()) {
+        extractAllSessionsToKill("The query pool was resized by administrator", e.toReuse, toKill);
+      }
+      // We will requeue, and not kill, the queries that are not running yet.
+      // Insert them all before the get requests from this iteration.
+      GetRequest req;
+      while ((req = queue.pollLast()) != null) {
+        e.getRequests.addFirst(req);
+      }
+    }
+
+    public void destroy(List<WmTezSession> toKill, LinkedList<GetRequest> globalQueue,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+      extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
+      // All the pending get requests should just be requeued elsewhere.
+      // Note that we never queue session reuse so sessionToReuse would be null.
+      globalQueue.addAll(0, queue);
+      queue.clear();
+    }
+
+    public double updateAllocationPercentages() {
+      // TODO: real implementation involving in-the-pool policy interface, etc.
+      double allocation = finalFractionRemaining / (sessions.size() + initializingSessions.size());
+      for (WmTezSession session : sessions) {
+        session.setClusterFraction(allocation);
+      }
+      // Do not give out the capacity of the initializing sessions to the running ones;
+      // we expect init to be fast.
+      return finalFractionRemaining - allocation * initializingSessions.size();
+    }
+
+    @Override
+    public String toString() {
+      return "[" + fullName + ", query parallelism " + queryParallelism
+          + ", fraction of the cluster " + finalFraction + ", fraction used by child pools "
+          + (finalFraction - finalFractionRemaining) + ", active sessions " + sessions.size()
+          + ", initializing sessions " + initializingSessions.size() + "]";
+    }
+
+    private void extractAllSessionsToKill(String killReason,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse, List<WmTezSession> toKill) {
+      for (WmTezSession sessionToKill : sessions) {
+        resetRemovedSession(sessionToKill, killReason, toReuse);
+        toKill.add(sessionToKill);
+      }
+      sessions.clear();
+      for (SessionInitContext initCtx : initializingSessions) {
+        // It is possible that the background init thread has finished in parallel, queued
+        // the message for us but also returned the session to the user.
+        WmTezSession sessionToKill = initCtx.cancelAndExtractSessionIfDone(killReason);
+        if (sessionToKill == null) {
+          continue; // Async op in progress; the callback will take care of this.
+        }
+        resetRemovedSession(sessionToKill, killReason, toReuse);
+        toKill.add(sessionToKill);
+      }
+      initializingSessions.clear();
+    }
+
+    private void resetRemovedSession(WmTezSession sessionToKill, String killReason,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+      assert killReason != null;
+      sessionToKill.setIsIrrelevantForWm(killReason);
+      sessionToKill.clearWm();
+      GetRequest req = toReuse.remove(sessionToKill);
+      if (req != null) {
+        req.sessionToReuse = null;
+      }
+    }
+
+    @VisibleForTesting
+    // will change in HIVE-17809
+    public void setTriggers(final List<Trigger> triggers) {
+      this.triggers = triggers;
+    }
+
+    public List<Trigger> getTriggers() {
+      return triggers;
+    }
   }
 
-  protected final HiveConf getConf() {
-    return conf;
+
+  private enum SessionInitState {
+    GETTING, // We are getting a session from TezSessionPool
+    WAITING_FOR_REGISTRY, // We have the session but it doesn't have registry info yet.
+    DONE, // We have the session with registry info, or we have failed.
+    CANCELED // The master thread has CANCELED this and will never look at it again.
   }
 
-  public List<String> getTriggerCounterNames() {
-    List<Trigger> activeTriggers = sessionTriggerProvider.getActiveTriggers();
-    List<String> counterNames = new ArrayList<>();
-    for (Trigger trigger : activeTriggers) {
-      counterNames.add(trigger.getExpression().getCounterLimit().getName());
+  /**
+   * The class that serves as a synchronization point, and future callback,
+   * for async session initialization, as well as parallel cancellation.
+   */
+  private final class SessionInitContext implements FutureCallback<WmTezSession> {
+    private final String poolName;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private WmTezSession session;
+    private SettableFuture<WmTezSession> future;
+    private SessionInitState state;
+    private String cancelReason;
+
+    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName) {
+      this.state = SessionInitState.GETTING;
+      this.future = future;
+      this.poolName = poolName;
+    }
+
+    @Override
+    public void onSuccess(WmTezSession session) {
+      SessionInitState oldState;
+      SettableFuture<WmTezSession> future = null;
+      lock.lock();
+      try {
+        oldState = state;
+        switch (oldState) {
+        case GETTING: {
+          LOG.debug("Received a session from AM pool {}", session);
+          assert this.state == SessionInitState.GETTING;
+          session.setPoolName(poolName);
+          session.setQueueName(yarnQueue);
+          this.session = session;
+          this.state = SessionInitState.WAITING_FOR_REGISTRY;
+          break;
+        }
+        case WAITING_FOR_REGISTRY: {
+          assert this.session != null;
+          this.state = SessionInitState.DONE;
+          assert session == this.session;
+          future = this.future;
+          this.future = null;
+          break;
+        }
+        case CANCELED: {
+          future = this.future;
+          this.session = null;
+          this.future = null;
+          break;
+        }
+        default: {
+          future = this.future;
+          this.future = null;
+          break;
+        }
+        }
+      } finally {
+        lock.unlock();
+      }
+      switch (oldState) {
+      case GETTING: {
+        ListenableFuture<WmTezSession> waitFuture = session.waitForAmRegistryAsync(
+            amRegistryTimeoutMs, timeoutPool);
+        Futures.addCallback(waitFuture, this);
+        break;
+      }
+      case WAITING_FOR_REGISTRY: {
+        // Notify the master thread and the user.
+        future.set(session);
+        notifyInitializationCompleted(this);
+        break;
+      }
+      case CANCELED: {
+        // Return session to the pool; we can do it directly here.
+        future.setException(new HiveException(
+            "The query was killed by workload management: " + cancelReason));
+        session.setPoolName(null);
+        session.setClusterFraction(0f);
+        tezAmPool.returnSession(session);
+        break;
+      }
+      default: {
+        AssertionError error = new AssertionError("Unexpected state " + state);
+        future.setException(error);
+        throw error;
+      }
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      SettableFuture<WmTezSession> future;
+      WmTezSession session;
+      boolean wasCANCELED = false;
+      lock.lock();
+      try {
+        wasCANCELED = (state == SessionInitState.CANCELED);
+        session = this.session;
+        future = this.future;
+        this.future = null;
+        this.session = null;
+        if (!wasCANCELED) {
+          this.state = SessionInitState.DONE;
+        }
+      } finally {
+        lock.unlock();
+      }
+      future.setException(t);
+      if (!wasCANCELED) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Queueing the initialization failure with " + session);
+        }
+        notifyInitializationCompleted(this); // Report failure to the main thread.
+      }
+      if (session != null) {
+        session.clearWm();
+        // We can just restart the session if we have received one.
+        try {
+          tezAmPool.replaceSession(session, false, null);
+        } catch (Exception e) {
+          LOG.error("Failed to restart a failed session", e);
+        }
+      }
+    }
+
+    /** Cancel the async operation (even if it's done), and return the session if done. */
+    public WmTezSession cancelAndExtractSessionIfDone(String cancelReason) {
+      lock.lock();
+      try {
+        SessionInitState state = this.state;
+        this.state = SessionInitState.CANCELED;
+        this.cancelReason = cancelReason;
+        if (state == SessionInitState.DONE) {
+          WmTezSession result = this.session;
+          this.session = null;
+          return result;
+        } else {
+          // In the states where a background operation is in progress, wait for the callback.
+          // Also, ignore any duplicate calls; also don't kill failed ones - handled elsewhere.
+          if (state == SessionInitState.CANCELED) {
+            LOG.warn("Duplicate call to extract " + session);
+          }
+          return null;
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /** Extracts the session and cancel the operation, both only if done. */
+    public boolean extractSessionAndCancelIfDone(List<WmTezSession> results) {
+      lock.lock();
+      try {
+        if (state != SessionInitState.DONE) return false;
+        this.state = SessionInitState.CANCELED;
+        if (this.session != null) {
+          results.add(this.session);
+        } // Otherwise we have failed; the callback has taken care of the failure.
+        this.session = null;
+        return true;
+      } finally {
+        lock.unlock();
+      }
     }
-    return counterNames;
   }
 
 
+
   // TODO: temporary until real WM schema is created.
   public static class TmpHivePool {
     private final String name;
     private final List<TmpHivePool> children;
     private final int queryParallelism;
     private final double resourceFraction;
+    private final List<Trigger> triggers;
 
     public TmpHivePool(String name,
         List<TmpHivePool> children, int queryParallelism, double resourceFraction) {
+      this(name, children, queryParallelism, resourceFraction, null);
+    }
+
+    public TmpHivePool(String name,
+        List<TmpHivePool> children, int queryParallelism, double resourceFraction,
+        List<Trigger> triggers) {
       this.name = name;
       this.children = children;
       this.queryParallelism = queryParallelism;
       this.resourceFraction = resourceFraction;
+      this.triggers = triggers;
     }
 
     public String getName() {
@@ -679,4 +1513,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       return mappings;
     }
   }
+
+  @VisibleForTesting
+  TezSessionPool<WmTezSession> getTezAmPool() {
+    return tezAmPool;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
index 209cf57..2623a0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapClusterStateForCompile.java
@@ -92,23 +92,18 @@ public class LlapClusterStateForCompile {
     return numExecutorsPerNode;
   }
 
-  public boolean initClusterInfo() {
-    return initClusterInfo(true);
-  }
-
-  private boolean isUpdateNeeded(boolean allowUpdate) {
+  private boolean isUpdateNeeded() {
     Long lastUpdateLocal = lastClusterUpdateNs;
     if (lastUpdateLocal == null) return true;
-    if (!allowUpdate) return false;
     long elapsed = System.nanoTime() - lastUpdateLocal;
     return (elapsed >= updateIntervalNs);
   }
 
-  public boolean initClusterInfo(boolean allowUpdate) {
-    if (!isUpdateNeeded(allowUpdate)) return true;
+  public boolean initClusterInfo() {
+    if (!isUpdateNeeded()) return true;
     synchronized (updateInfoLock) {
       // At this point, no one will take the write lock and update, so we can do the last check.
-      if (!isUpdateNeeded(allowUpdate)) return true;
+      if (!isUpdateNeeded()) return true;
       if (svc == null) {
         try {
           svc = LlapRegistryService.getClient(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 59efd43..5248454 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -19,14 +19,17 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 
-import java.util.Collection;
-import org.apache.hadoop.fs.Path;
+import com.google.common.util.concurrent.Futures;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
-import java.net.URISyntaxException;
-
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
 import javax.security.auth.login.LoginException;
-
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +48,7 @@ public class SampleTezSessionState extends WmTezSession {
   private final HiveConf hiveConf;
   private String user;
   private boolean doAsEnabled;
+  private ListenableFuture<Boolean> waitForAmRegFuture;
 
   public SampleTezSessionState(
       String sessionId, TezSessionPoolSession.Manager parent, HiveConf conf) {
@@ -52,6 +56,13 @@ public class SampleTezSessionState extends WmTezSession {
         ? ((TezSessionPoolManager)parent).getExpirationTracker() : null, conf);
     this.sessionId = sessionId;
     this.hiveConf = conf;
+    waitForAmRegFuture = createDefaultWaitForAmRegistryFuture();
+  }
+
+  private SettableFuture<Boolean> createDefaultWaitForAmRegistryFuture() {
+    SettableFuture<Boolean> noWait = SettableFuture.create();
+    noWait.set(true); // By default, do not wait.
+    return noWait;
   }
 
   @Override
@@ -106,4 +117,27 @@ public class SampleTezSessionState extends WmTezSession {
   public boolean getDoAsEnabled() {
     return this.doAsEnabled;
   }
+
+  @Override
+  public SettableFuture<WmTezSession> waitForAmRegistryAsync(
+      int timeoutMs, ScheduledExecutorService timeoutPool) {
+    final SampleTezSessionState session = this;
+    final SettableFuture<WmTezSession> future = SettableFuture.create();
+    Futures.addCallback(waitForAmRegFuture, new FutureCallback<Boolean>() {
+      @Override
+      public void onSuccess(Boolean result) {
+        future.set(session);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        future.setException(t);
+      }
+    });
+    return future;
+  }
+
+  public void setWaitForAmRegistryFuture(ListenableFuture<Boolean> future) {
+    waitForAmRegFuture = future != null ? future : createDefaultWaitForAmRegistryFuture();
+  }
 }


[3/3] hive git commit: HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-17841 : implement applying the resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77b99e4c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77b99e4c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77b99e4c

Branch: refs/heads/master
Commit: 77b99e4c9e39b9905ff98a002ca35f7edd9b7b30
Parents: c5a9673
Author: sergey <se...@apache.org>
Authored: Thu Nov 2 11:30:38 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Nov 2 11:30:38 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    3 +
 .../hive/jdbc/TestTriggersWorkloadManager.java  |   12 +-
 .../hadoop/hive/ql/exec/tez/AmPluginNode.java   |    2 +-
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |   29 +-
 .../exec/tez/LlapPluginEndpointClientImpl.java  |    2 +-
 .../ql/exec/tez/QueryAllocationManager.java     |    3 +-
 .../ql/exec/tez/SessionExpirationTracker.java   |   16 +-
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  371 ++++-
 .../hive/ql/exec/tez/TezSessionPoolManager.java |   74 +-
 .../hive/ql/exec/tez/TezSessionPoolSession.java |   26 +-
 .../hive/ql/exec/tez/TezSessionState.java       |    2 +-
 .../hive/ql/exec/tez/UserPoolMapping.java       |   16 +-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  109 +-
 .../hive/ql/exec/tez/WorkloadManager.java       | 1549 ++++++++++++++----
 .../physical/LlapClusterStateForCompile.java    |   13 +-
 .../hive/ql/exec/tez/SampleTezSessionState.java |   44 +-
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  365 ++++-
 17 files changed, 2093 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3c853a..b65bdab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2419,6 +2419,9 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE("hive.server2.tez.interactive.queue", "",
         "A single YARN queues to use for Hive Interactive sessions. When this is specified,\n" +
         "workload management is enabled and used for these sessions."),
+    HIVE_SERVER2_TEZ_WM_WORKER_THREADS("hive.server2.tez.wm.worker.threads", 4,
+        "Number of worker threads to use to perform the synchronous operations with Tez\n" +
+        "sessions for workload management (e.g. opening, closing, etc.)"),
     HIVE_SERVER2_TEZ_WM_AM_REGISTRY_TIMEOUT("hive.server2.tez.wm.am.registry.timeout", "30s",
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting to use the\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
index fdb660a..0ec7e85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestTriggersWorkloadManager.java
@@ -16,17 +16,21 @@
 
 package org.apache.hive.jdbc;
 
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpHivePool;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpResourcePlan;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMapping;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager.TmpUserMappingType;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import org.apache.hive.jdbc.miniHS2.MiniHS2.MiniClusterType;
@@ -70,6 +74,10 @@ public class TestTriggersWorkloadManager extends TestTriggersTezSessionPoolManag
   @Override
   protected void setupTriggers(final List<Trigger> triggers) throws Exception {
     WorkloadManager wm = WorkloadManager.getInstance();
-    wm.getPools().get("llap").setTriggers(triggers);
+    TmpResourcePlan rp = new TmpResourcePlan(Lists.newArrayList(new TmpHivePool(
+        "llap", null, 1, 1.0f, triggers)), Lists.newArrayList(
+            new TmpUserMapping(TmpUserMappingType.DEFAULT, "", "llap", 1)));
+    wm.updateResourcePlanAsync(rp).get(10, TimeUnit.SECONDS);
   }
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
index 35d380c..e4a21cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/AmPluginNode.java
@@ -39,5 +39,5 @@ public interface AmPluginNode {
     }
   }
 
-  AmPluginInfo waitForAmPluginInfo(int timeoutMs) throws InterruptedException, TimeoutException;
+  AmPluginInfo getAmPluginInfo() throws InterruptedException, TimeoutException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index d978a25..53dd698 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -64,6 +64,8 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
 
   @Override
   public void start() {
+    // Try to get cluster information once, to avoid immediate cluster-update event in WM.
+    clusterState.initClusterInfo();
     clusterStateUpdateThread.start();
   }
 
@@ -72,9 +74,13 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
     clusterStateUpdateThread.interrupt(); // Don't wait for the thread.
   }
 
+  public void initClusterInfo() {
+    clusterState.initClusterInfo();
+  }
+
   @VisibleForTesting
   protected int getExecutorCount(boolean allowUpdate) {
-    if (!clusterState.initClusterInfo(allowUpdate)) {
+    if (allowUpdate && !clusterState.initClusterInfo()) {
       LOG.warn("Failed to get LLAP cluster information for "
           + HiveConf.getTrimmedVar(this.conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS)
           + "; we may rely on outdated cluster status");
@@ -92,15 +98,18 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
   }
 
   @Override
-  public void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
-    // Do not make a remote call unless we have no information at all.
+  public void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessionsToUpdate) {
+    // Do not make a remote call under any circumstances - this is supposed to be async.
     int totalCount = getExecutorCount(false);
-    int totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+    int totalToDistribute = -1;
+    if (totalMaxAlloc != null) {
+      totalToDistribute = (int)Math.round(totalCount * totalMaxAlloc);
+    }
     double lastDelta = 0;
     for (int i = 0; i < sessionsToUpdate.size(); ++i) {
       WmTezSession session = sessionsToUpdate.get(i);
       int intAlloc = -1;
-      if (i + 1 == sessionsToUpdate.size()) {
+      if (i + 1 == sessionsToUpdate.size() && totalToDistribute >= 0) {
         intAlloc = totalToDistribute;
         // We rely on the caller to supply a reasonable total; we could log a warning
         // if this doesn't match the allocation of the last session beyond some threshold.
@@ -119,10 +128,12 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
         intAlloc = (int)roundedAlloc;
       }
       // Make sure we don't give out more than allowed due to double/rounding artifacts.
-      if (intAlloc > totalToDistribute) {
-        intAlloc = totalToDistribute;
+      if (totalToDistribute >= 0) {
+        if (intAlloc > totalToDistribute) {
+          intAlloc = totalToDistribute;
+        }
+        totalToDistribute -= intAlloc;
       }
-      totalToDistribute -= intAlloc;
       // This will only send update if it's necessary.
       updateSessionAsync(session, intAlloc);
     }
@@ -162,7 +173,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager {
       // RPC already handles retries, so we will just try to kill the session here.
       // This will cause the current query to fail. We could instead keep retrying.
       try {
-        session.destroy();
+        session.handleUpdateError();
       } catch (Exception e) {
         LOG.error("Failed to kill the session " + session);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
index 45c3e38..4f373162 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/LlapPluginEndpointClientImpl.java
@@ -114,7 +114,7 @@ public class LlapPluginEndpointClientImpl extends
 
     private void ensureInfo() throws InterruptedException, TimeoutException {
       if (info != null) return;
-      info = node.waitForAmPluginInfo(0); // Don't wait - should already be initialized.
+      info = node.getAmPluginInfo(); // Don't wait - should already be initialized.
       if (info == null) {
         throw new AssertionError("A request was created without AM plugin info for " + node);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
index a326db3..acacfd0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/QueryAllocationManager.java
@@ -26,7 +26,8 @@ interface QueryAllocationManager {
    * Updates the session allocations asynchoronously.
    * @param totalMaxAlloc The total maximum fraction of the cluster to allocate. Used to
    *                      avoid various artifacts, esp. with small numbers and double weirdness.
+   *                      Null means the total is unknown.
    * @param sessions Sessions to update based on their allocation fraction.
    */
-  void updateSessionsAsync(double totalMaxAlloc, List<WmTezSession> sessions);
+  void updateSessionsAsync(Double totalMaxAlloc, List<WmTezSession> sessions);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
index da93a3a..77ff484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
@@ -48,7 +48,7 @@ class SessionExpirationTracker {
   private volatile SessionState initSessionState;
 
   interface RestartImpl {
-    void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception;
+    void closeAndReopenExpiredSession(TezSessionPoolSession session) throws Exception;
   }
 
   public static SessionExpirationTracker create(HiveConf conf, RestartImpl restartImpl) {
@@ -114,7 +114,7 @@ class SessionExpirationTracker {
         TezSessionPoolSession next = restartQueue.take();
         LOG.info("Restarting the expired session [" + next + "]");
         try {
-          sessionRestartImpl.closeAndReopenPoolSession(next);
+          sessionRestartImpl.closeAndReopenExpiredSession(next);
         } catch (InterruptedException ie) {
           throw ie;
         } catch (Exception e) {
@@ -225,12 +225,12 @@ class SessionExpirationTracker {
     expirationQueue.remove(session);
   }
 
+  public void closeAndRestartExpiredSessionAsync(TezSessionPoolSession session) {
+    restartQueue.add(session);
+  }
+
   public void closeAndRestartExpiredSession(
-      TezSessionPoolSession session, boolean isAsync) throws Exception {
-    if (isAsync) {
-      restartQueue.add(session);
-    } else {
-      sessionRestartImpl.closeAndReopenPoolSession(session);
-    }
+      TezSessionPoolSession session) throws Exception {
+    sessionRestartImpl.closeAndReopenExpiredSession(session);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index b67c933..9c2b3ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,17 +17,25 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Queue;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -45,35 +53,48 @@ import org.slf4j.LoggerFactory;
 class TezSessionPool<SessionType extends TezSessionPoolSession> {
   private static final Logger LOG = LoggerFactory.getLogger(TezSessionPool.class);
 
-  /** A queue for initial sessions that have not been started yet. */
-  private final Queue<SessionType> initialSessions =
-      new ConcurrentLinkedQueue<SessionType>();
+  public interface SessionObjectFactory<SessionType> {
+    SessionType create(SessionType oldSession);
+  }
 
   private final HiveConf initConf;
-  private int initialSize;
+  private int initialSize = 0; // For testing only.
+  private final SessionObjectFactory<SessionType> sessionObjFactory;
 
-  // TODO: eventually, this will need to support resize. That would probably require replacement
-  //       with a RW lock, a semaphore and linked list.
-  private BlockingDeque<SessionType> defaultQueuePool;
+  private final ReentrantLock poolLock = new ReentrantLock(true);
+  private final Condition notEmpty = poolLock.newCondition();
+  private final LinkedList<SessionType> pool = new LinkedList<>();
+  private final LinkedList<SettableFuture<SessionType>> asyncRequests = new LinkedList<>();
+  /**
+   * The number of sessions that needs to be started or killed because of the resize calls on
+   * the pool. When increasing the size, we set this to a positive number and start new sessions
+   * on background threads, gradually bringing it back to 0.
+   * When decreasing the size, we try to kill as many existing sessions as we can; if that is
+   * not enough because the sessions are in use or being restarted, we kill them as they are
+   * re-added to the pool.
+   * Repeated calls to resize adjust the delta to ensure correctness between different resizes.
+   */
+  private final AtomicInteger deltaRemaining = new AtomicInteger();
 
   private final String amRegistryName;
   private final TezAmRegistryImpl amRegistry;
 
   private final ConcurrentHashMap<String, SessionType> bySessionId =
       new ConcurrentHashMap<>();
+  // Preserved at initialization time to have a session to use during resize.
+  // TODO: rather, Tez sessions should not depend on SessionState.
+  private SessionState parentSessionState;
 
-
-  TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) {
+  TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent,
+      SessionObjectFactory<SessionType> sessionFactory) {
     this.initConf = initConf;
-    assert numSessionsTotal > 0;
-    defaultQueuePool = new LinkedBlockingDeque<>(numSessionsTotal);
+    this.initialSize = numSessionsTotal;
     this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null;
     this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName();
+    this.sessionObjFactory = sessionFactory;
   }
 
-  void startInitialSessions() throws Exception {
-    initialSize = initialSessions.size();
-    if (initialSessions.isEmpty()) return;
+  void start() throws Exception {
     if (amRegistry != null) {
       amRegistry.start();
       amRegistry.initializeWithoutRegistering();
@@ -82,82 +103,145 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       amRegistry.populateCache(true);
     }
 
-    int threadCount = Math.min(initialSessions.size(),
+    int threadCount = Math.min(initialSize,
         HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
     if (threadCount == 1) {
-      while (true) {
-        SessionType session = initialSessions.poll();
+      for (int i = 0; i < initialSize; ++i) {
+        SessionType session = sessionObjFactory.create(null);
         if (session == null) break;
         startInitialSession(session);
       }
     } else {
-      final SessionState parentSessionState = SessionState.get();
-      // The runnable has no mutable state, so each thread can run the same thing.
-      final AtomicReference<Exception> firstError = new AtomicReference<>(null);
-      Runnable runnable = new Runnable() {
-        public void run() {
-          if (parentSessionState != null) {
-            SessionState.setCurrentSessionState(parentSessionState);
-          }
-          while (true) {
-            SessionType session = initialSessions.poll();
-            if (session == null) break;
-            if (firstError.get() != null) break; // Best-effort.
-            try {
-              startInitialSession(session);
-            } catch (Exception e) {
-              if (!firstError.compareAndSet(null, e)) {
-                LOG.error("Failed to start session; ignoring due to previous error", e);
-              }
-              break;
-            }
-          }
+      final AtomicInteger remaining = new AtomicInteger(initialSize);
+      this.parentSessionState = SessionState.get();
+      @SuppressWarnings("unchecked")
+      FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
+      for (int i = threadTasks.length - 1; i >= 0; --i) {
+        threadTasks[i] = new FutureTask<Boolean>(new CreateSessionsRunnable(remaining));
+        if (i == 0) {
+          // Start is blocking, so run one of the tasks on the main thread.
+          threadTasks[i].run();
+        } else {
+          new Thread(threadTasks[i], "Tez session init " + i).start();
         }
-      };
-      Thread[] threads = new Thread[threadCount - 1];
-      for (int i = 0; i < threads.length; ++i) {
-        threads[i] = new Thread(runnable, "Tez session init " + i);
-        threads[i].start();
-      }
-      runnable.run();
-      for (int i = 0; i < threads.length; ++i) {
-        threads[i].join();
       }
-      Exception ex = firstError.get();
-      if (ex != null) {
-        throw ex;
+      for (int i = 0; i < threadTasks.length; ++i) {
+        threadTasks[i].get();
       }
     }
   }
 
-  void addInitialSession(SessionType session) {
-    initialSessions.add(session);
-  }
-
   SessionType getSession() throws Exception {
     while (true) {
-      SessionType result = defaultQueuePool.take();
-      if (result.tryUse()) return result;
+      SessionType result = null;
+      poolLock.lock();
+      try {
+        while ((result = pool.poll()) == null) {
+          notEmpty.await(100, TimeUnit.MILLISECONDS);
+        }
+      } finally {
+        poolLock.unlock();
+      }
+      if (result.tryUse(false)) return result;
       LOG.info("Couldn't use a session [" + result + "]; attempting another one");
     }
   }
 
-  void returnSession(SessionType session) throws Exception {
+  ListenableFuture<SessionType> getSessionAsync() throws Exception {
+    SettableFuture<SessionType> future = SettableFuture.create();
+    poolLock.lock();
+    try {
+      // Try to get the session quickly.
+      while (true) {
+        SessionType result = pool.poll();
+        if (result == null) break;
+        if (!result.tryUse(false)) continue;
+        future.set(result);
+        return future;
+      }
+      // The pool is empty; queue the request.
+      asyncRequests.add(future);
+      return future;
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  void returnSession(SessionType session) {
+    returnSessionInternal(session, false);
+  }
+
+  boolean returnSessionAsync(SessionType session) {
+    return returnSessionInternal(session, true);
+  }
+
+  private boolean returnSessionInternal(SessionType session, boolean isAsync) {
     // Make sure that if the session is returned to the pool, it doesn't live in the global.
     SessionState sessionState = SessionState.get();
     if (sessionState != null) {
       sessionState.setTezSession(null);
     }
-    if (session.stopUsing()) {
-      defaultQueuePool.putFirst(session);
+    if (!session.stopUsing()) return true; // The session will be restarted and return to us.
+    boolean canPutBack = putSessionBack(session, true);
+    if (canPutBack) return true;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Closing an unneeded returned session " + session);
+    }
+
+    if (isAsync) return false; // The caller is responsible for destroying the session.
+    try {
+      session.close(false);
+    } catch (Exception ex) {
+      LOG.error("Failed to close " + session, ex);
+    }
+    return true;
+  }
+
+  /**
+   * Puts session back into the pool.
+   * @return true if the session has been put back; false if it's not needed and should be killed.
+   */
+  private boolean putSessionBack(SessionType session, boolean isFirst) {
+    SettableFuture<SessionType> future = null;
+    poolLock.lock();
+    try {
+      // See if we need to kill some sessions because the pool was resized down while
+      // a bunch of sessions were outstanding. See also deltaRemaining javadoc.
+      while (true) {
+        int remainingToKill = -deltaRemaining.get();
+        if (remainingToKill <= 0) break; // No need to kill anything.
+        if (deltaRemaining.compareAndSet(-remainingToKill, -remainingToKill + 1)) {
+          return false;
+        }
+      }
+      // If there are async requests, satisfy them first.
+      if (!asyncRequests.isEmpty() && session.tryUse(false)) {
+        future = asyncRequests.poll();
+      }
+      if (future == null) {
+        // Put session into the pool.
+        if (isFirst) {
+          pool.addFirst(session);
+        } else {
+          pool.addLast(session);
+        }
+        notEmpty.signalAll();
+      }
+    } finally {
+      poolLock.unlock();
     }
+    if (future != null) {
+      future.set(session);
+    }
+    return true;
   }
 
-  void replaceSession(SessionType oldSession, SessionType newSession,
-      boolean keepTmpDir, String[] additionalFilesArray, HiveConf conf) throws Exception {
+  void replaceSession(SessionType oldSession, boolean keepTmpDir,
+      String[] additionalFilesArray) throws Exception {
     // Retain the stuff from the old session.
     // Re-setting the queue config is an old hack that we may remove in future.
+    SessionType newSession = sessionObjFactory.create(oldSession);
     Path scratchDir = oldSession.getTezScratchDir();
     String queueName = oldSession.getQueueName();
     Set<String> additionalFiles = null;
@@ -171,12 +255,16 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     }
     try {
       oldSession.close(keepTmpDir);
-      boolean wasRemoved = defaultQueuePool.remove(oldSession);
-      if (!wasRemoved) {
-        LOG.error("Old session was closed but it was not in the pool", oldSession);
+    } finally {
+      poolLock.lock();
+      try {
+        // The expiring session may or may not be in the pool.
+        pool.remove(oldSession);
+      } finally {
+        poolLock.unlock();
       }
+
       bySessionId.remove(oldSession.getSessionId());
-    } finally {
       // There's some bogus code that can modify the queue name. Force-set it for pool sessions.
       // TODO: this might only be applicable to TezSessionPoolManager; try moving it there?
       newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
@@ -184,18 +272,36 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       // registry again just in case. TODO: maybe we should enforce that.
       configureAmRegistry(newSession);
       newSession.open(additionalFiles, scratchDir);
-      defaultQueuePool.put(newSession);
+      if (!putSessionBack(newSession, false)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing an unneeded session " + newSession
+              + "; trying to replace " + oldSession);
+        }
+        try {
+          newSession.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an unneeded session", ex);
+        }
+      }
     }
   }
 
+
   private void startInitialSession(SessionType session) throws Exception {
-    boolean isUsable = session.tryUse();
+    boolean isUsable = session.tryUse(true);
     if (!isUsable) throw new IOException(session + " is not usable at pool startup");
     session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName());
     configureAmRegistry(session);
     session.open();
     if (session.stopUsing()) {
-      defaultQueuePool.put(session);
+      if (!putSessionBack(session, false)) {
+        LOG.warn("Couldn't add a session during initialization");
+        try {
+          session.close(false);
+        } catch (Exception ex) {
+          LOG.error("Failed to close an unneeded session", ex);
+        }
+      }
     }
   }
 
@@ -230,7 +336,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     @Override
     public void onUpdate(TezAmInstance serviceInstance) {
       // Presumably we'd get those later if AM updates its stuff.
-      LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance);
+      LOG.info("Received an unexpected update for instance={}. Ignoring", serviceInstance);
     }
 
     @Override
@@ -239,11 +345,126 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       // For now, we don't take any action. In future, we might restore the session based
       // on this and get rid of the logic outside of the pool that replaces/reopens/etc.
       LOG.warn("AM for " + sessionId + " has disappeared from the registry");
+      // TODO: this might race if AM for the same session is restarted internally by Tez.
+      //        It is possible to receive the create before remove and remove the wrong one.
+      //        We need some identity in the value to make sure that doesn't happen.
       bySessionId.remove(sessionId);
     }
   }
 
+  @VisibleForTesting
   int getInitialSize() {
     return initialSize;
   }
+
+  /**
+   * Resizes the pool asynchronously.
+   * @param delta A number of threads to add or remove.
+   * @param toClose An output list to which newly-unneeded sessions, to be closed by the caller.
+   */
+  public ListenableFuture<?> resizeAsync(int delta, List<SessionType> toClose) {
+    if (delta == 0) return createDummyFuture();
+    poolLock.lock();
+    try {
+      if (delta < 0) {
+        return resizeDownInternal(-delta, toClose);
+      } else {
+        return resizeUpInternal(delta);
+      }
+    } finally {
+      poolLock.unlock();
+    }
+  }
+
+  private ListenableFuture<?> resizeUpInternal(int delta) {
+    // 1) Cancel the kills if any, to avoid killing the returned sessions.
+    //    Also sets the count for the async initialization.
+    int oldVal;
+    do {
+      oldVal = deltaRemaining.get();
+    } while (!deltaRemaining.compareAndSet(oldVal, oldVal + delta));
+    int toStart = oldVal + delta;
+    if (toStart <= 0) return createDummyFuture();
+    LOG.info("Resizing the pool; adding " + toStart + " sessions");
+
+    // 2) If we need to create some extra sessions, we'd do it just like startup does.
+    int threadCount = Math.max(1, Math.min(toStart,
+        HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)));
+    List<ListenableFutureTask<Boolean>> threadTasks = new ArrayList<>(threadCount);
+    // This is an async method, so always launch threads, even for a single task.
+    for (int i = 0; i < threadCount; ++i) {
+      ListenableFutureTask<Boolean> task = ListenableFutureTask.create(
+          new CreateSessionsRunnable(deltaRemaining));
+      new Thread(task, "Tez pool resize " + i).start();
+      threadTasks.add(task);
+    }
+    return Futures.allAsList(threadTasks);
+  }
+
+  private ListenableFuture<Boolean> resizeDownInternal(int delta, List<SessionType> toClose) {
+    // 1) Cancel the previous expansion, if any.
+    while (true) {
+      int expansionCount = deltaRemaining.get();
+      if (expansionCount <= 0) break;
+      int expansionCancelled = Math.min(expansionCount, delta);
+      if (deltaRemaining.compareAndSet(expansionCount, expansionCount - expansionCancelled)) {
+        delta -= expansionCancelled;
+        break;
+      }
+    }
+    // 2) Drain unused sessions; the close() is sync so delegate to the caller.
+    while (delta > 0) {
+      SessionType session = pool.poll();
+      if (session == null) break;
+      if (!session.tryUse(true)) continue;
+      toClose.add(session);
+      --delta;
+    }
+    // 3) If too many sessions are outstanding (e.g. due to expiration restarts - should
+    //    not happen with in-use sessions because WM already kills the extras), we will kill
+    //    them as they come back from restarts.
+    if (delta > 0) {
+      int oldVal;
+      do {
+        oldVal = deltaRemaining.get();
+      } while (!deltaRemaining.compareAndSet(oldVal, oldVal - delta));
+    }
+    return createDummyFuture();
+  }
+
+  private ListenableFuture<Boolean> createDummyFuture() {
+    SettableFuture<Boolean> f = SettableFuture.create();
+    f.set(true);
+    return f;
+  }
+
+  private final class CreateSessionsRunnable implements Callable<Boolean> {
+    private final AtomicInteger remaining;
+
+    private CreateSessionsRunnable(AtomicInteger remaining) {
+      this.remaining = remaining;
+    }
+
+    public Boolean call() throws Exception {
+      if (parentSessionState != null) {
+        SessionState.setCurrentSessionState(parentSessionState);
+      }
+      while (true) {
+        int oldVal = remaining.get();
+        if (oldVal <= 0) return true;
+        if (!remaining.compareAndSet(oldVal, oldVal - 1)) continue;
+        startInitialSession(sessionObjFactory.create(null));
+      }
+    }
+  }
+
+  @VisibleForTesting
+  int getCurrentSize() {
+    poolLock.tryLock();
+    try {
+      return pool.size();
+    } finally {
+      poolLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 9b4714f..15543d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import javax.security.auth.login.LoginException;
+import org.apache.tez.dag.api.TezException;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -102,7 +107,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   public void startPool() throws Exception {
     if (defaultSessionPool != null) {
-      defaultSessionPool.startInitialSessions();
+      defaultSessionPool.start();
     }
     if (expirationTracker != null) {
       expirationTracker.start();
@@ -110,7 +115,7 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
   }
 
   public void setupPool(HiveConf conf) throws Exception {
-    String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
+    final String[] defaultQueueList = HiveConf.getTrimmedStringsVar(
         conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
     this.initConf = conf;
     int emptyNames = 0; // We don't create sessions for empty entries.
@@ -123,7 +128,37 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames);
     if (numSessionsTotal > 0) {
       boolean enableAmRegistry = false;
-      defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry);
+      defaultSessionPool = new TezSessionPool<>(initConf, numSessionsTotal, enableAmRegistry,
+          new TezSessionPool.SessionObjectFactory<TezSessionPoolSession>() {
+            int queueIx = 0;
+
+            @Override
+            public TezSessionPoolSession create(TezSessionPoolSession oldSession) {
+              if (oldSession != null) {
+                return createAndInitSession(
+                    oldSession.getQueueName(), oldSession.isDefault(), oldSession.getConf());
+              }
+              // We never resize the pool, so assume this is initialization.
+              // If that changes, we might have to make the factory interface more complicated.
+              /*
+               * In a single-threaded init case, with this the ordering of sessions in the queue will be
+               * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
+               * distribution of the sessions across queues at least to begin with. Then as sessions get
+               * freed up, the list may change this ordering.
+               * In a multi threaded init case it's a free for all.
+               */
+              int localQueueIx;
+              synchronized (defaultQueueList) {
+                localQueueIx = queueIx;
+                ++queueIx;
+                if (queueIx == defaultQueueList.length) {
+                  queueIx = 0;
+                }
+              }
+              HiveConf sessionConf = new HiveConf(initConf);
+              return createAndInitSession(defaultQueueList[localQueueIx], true, sessionConf);
+          }
+      });
     }
 
     numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -155,24 +190,6 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     if (!hasInitialSessions) {
       return;
     }
-
-
-    /*
-     * In a single-threaded init case, with this the ordering of sessions in the queue will be
-     * (with 2 sessions 3 queues) s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform
-     * distribution of the sessions across queues at least to begin with. Then as sessions get
-     * freed up, the list may change this ordering.
-     * In a multi threaded init case it's a free for all.
-     */
-    for (int i = 0; i < numSessions; i++) {
-      for (String queueName : defaultQueueList) {
-        if (queueName.isEmpty()) {
-          continue;
-        }
-        HiveConf sessionConf = new HiveConf(initConf);
-        defaultSessionPool.addInitialSession(createAndInitSession(queueName, true, sessionConf));
-      }
-    }
   }
 
   // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration
@@ -438,7 +455,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
         && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
       sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
     }
+    reopenInternal(sessionState, additionalFiles);
+    return sessionState;
+  }
+
+  static void reopenInternal(
+      TezSessionState sessionState, String[] additionalFiles) throws Exception {
     Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf();
+    // TODO: implies the session files and the array are the same if not null; why? very brittle
     if ((oldAdditionalFiles == null || oldAdditionalFiles.isEmpty())
         && (additionalFiles != null)) {
       oldAdditionalFiles = new HashSet<>();
@@ -449,11 +473,11 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
     // TODO: close basically resets the object to a bunch of nulls.
     //       We should ideally not reuse the object because it's pointless and error-prone.
     sessionState.close(true);
-    // TODO: should we reuse scratchDir too?
+    // Note: scratchdir is reused implicitly because the sessionId is the same.
     sessionState.open(oldAdditionalFiles, null);
-    return sessionState;
   }
 
+
   public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
     List<TezSessionPoolSession> sessionsToClose = null;
     synchronized (openSessions) {
@@ -467,14 +491,14 @@ public class TezSessionPoolManager extends TezSessionPoolSession.AbstractTrigger
 
   /** Closes a running (expired) pool session and reopens it. */
   @Override
-  public void closeAndReopenPoolSession(TezSessionPoolSession oldSession) throws Exception {
+  public void closeAndReopenExpiredSession(TezSessionPoolSession oldSession) throws Exception {
     String queueName = oldSession.getQueueName();
     if (queueName == null) {
       LOG.warn("Pool session has a null queue: " + oldSession);
     }
     TezSessionPoolSession newSession = createAndInitSession(
       queueName, oldSession.isDefault(), oldSession.getConf());
-    defaultSessionPool.replaceSession(oldSession, newSession, false, null, null);
+    defaultSessionPool.replaceSession(oldSession, false, null);
   }
 
   /** Called by TezSessionPoolSession when opened. */

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 6135223..6887d7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+
+import org.apache.hadoop.conf.Configuration;
+
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -139,28 +145,28 @@ class TezSessionPoolSession extends TezSessionState {
    * Tries to use this session. When the session is in use, it will not expire.
    * @return true if the session can be used; false if it has already expired.
    */
-  public boolean tryUse() throws Exception {
+  public boolean tryUse(boolean ignoreExpiration) {
     while (true) {
       int oldValue = sessionState.get();
       if (oldValue == STATE_IN_USE) throw new AssertionError(this + " is already in use");
       if (oldValue == STATE_EXPIRED) return false;
-      int finalState = shouldExpire() ? STATE_EXPIRED : STATE_IN_USE;
+      int finalState = (!ignoreExpiration && shouldExpire()) ? STATE_EXPIRED : STATE_IN_USE;
       if (sessionState.compareAndSet(STATE_NONE, finalState)) {
         if (finalState == STATE_IN_USE) return true;
         // Restart asynchronously, don't block the caller.
-        expirationTracker.closeAndRestartExpiredSession(this, true);
+        expirationTracker.closeAndRestartExpiredSessionAsync(this);
         return false;
       }
     }
   }
 
-  boolean stopUsing() throws Exception {
+  boolean stopUsing() {
     int finalState = shouldExpire() ? STATE_EXPIRED : STATE_NONE;
     if (!sessionState.compareAndSet(STATE_IN_USE, finalState)) {
       throw new AssertionError("Unexpected state change; currently " + sessionState.get());
     }
     if (finalState == STATE_NONE) return true;
-    expirationTracker.closeAndRestartExpiredSession(this, true);
+    expirationTracker.closeAndRestartExpiredSessionAsync(this);
     return false;
   }
 
@@ -176,13 +182,17 @@ class TezSessionPoolSession extends TezSessionState {
     while (true) {
       if (sessionState.get() != STATE_NONE) return true; // returnAfterUse will take care of this
       if (sessionState.compareAndSet(STATE_NONE, STATE_EXPIRED)) {
-        expirationTracker.closeAndRestartExpiredSession(this, isAsync);
+        if (isAsync) {
+          expirationTracker.closeAndRestartExpiredSessionAsync(this);
+        } else {
+          expirationTracker.closeAndRestartExpiredSession(this);
+        }
         return true;
       }
     }
   }
 
-  private boolean shouldExpire() {
+  private final boolean shouldExpire() {
     return expirationNs != null && (System.nanoTime() - expirationNs) >= 0;
   }
 
@@ -209,4 +219,4 @@ class TezSessionPoolSession extends TezSessionState {
   void updateFromRegistry(TezAmInstance si) {
     // Nothing to do.
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 1448168..541ee73 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -629,7 +629,7 @@ public class TezSessionState {
     }
   }
 
-  public void cleanupScratchDir () throws IOException {
+  protected final void cleanupScratchDir () throws IOException {
     FileSystem fs = tezScratchDir.getFileSystem(conf);
     fs.delete(tezScratchDir, true);
     tezScratchDir = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
index 81d6b85..96dc7d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/UserPoolMapping.java
@@ -41,13 +41,12 @@ class UserPoolMapping {
   private final String defaultPoolName;
   // TODO: add other types as needed
 
-  public UserPoolMapping(List<TmpUserMapping> mappings, Set<String> poolNames) {
+  public UserPoolMapping(List<TmpUserMapping> mappings) {
     String defaultPoolName = null;
     for (TmpUserMapping mapping : mappings) {
       switch (mapping.getType()) {
       case USER: {
-          String poolName = getValidPoolName(poolNames, mapping);
-          Mapping val = new Mapping(poolName, mapping.getPriority());
+          Mapping val = new Mapping(mapping.getPoolName(), mapping.getPriority());
           Mapping oldValue = userMappings.put(mapping.getName(), val);
           if (oldValue != null) {
             throw new AssertionError("Duplicate mapping for user " + mapping.getName() + "; "
@@ -56,7 +55,7 @@ class UserPoolMapping {
         break;
       }
       case DEFAULT: {
-        String poolName = getValidPoolName(poolNames, mapping);
+        String poolName = mapping.getPoolName();
         if (defaultPoolName != null) {
           throw new AssertionError("Duplicate default mapping; "
               + defaultPoolName + " and " + poolName);
@@ -80,13 +79,4 @@ class UserPoolMapping {
     if (userMapping != null) return userMapping.fullPoolName;
     return defaultPoolName;
   }
-
-  private static String getValidPoolName(Set<String> poolNames, TmpUserMapping mapping) {
-    String poolName = mapping.getPoolName();
-    // Should we really be validating here? The plan should be validated before applying.
-    if (!poolNames.contains(mapping.getPoolName())) {
-      throw new AssertionError("Invalid pool in the mapping " + poolName);
-    }
-    return poolName;
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/77b99e4c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 00501ee..0dd1433 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -18,18 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.util.concurrent.TimeoutException;
+import com.google.common.util.concurrent.ListenableFuture;
 
+import org.apache.hadoop.hive.ql.exec.Utilities;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 
 public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
   private String poolName;
   private double clusterFraction;
+  private String killReason = null;
 
   private final Object amPluginInfoLock = new Object();
   private AmPluginInfo amPluginInfo = null;
+  private SettableFuture<WmTezSession> amRegistryFuture = null;
+  private ScheduledFuture<?> timeoutTimer = null;
 
+  private final WorkloadManager wmParent;
 
   /** The actual state of the guaranteed task, and the update state, for the session. */
   // Note: hypothetically, a generic WM-aware-session should not know about guaranteed tasks.
@@ -41,34 +53,60 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   }
   private final ActualWmState actualState = new ActualWmState();
 
-  public WmTezSession(String sessionId, Manager parent,
+  public WmTezSession(String sessionId, WorkloadManager parent,
       SessionExpirationTracker expiration, HiveConf conf) {
     super(sessionId, parent, expiration, conf);
+    wmParent = parent;
   }
 
-  @Override
-  public AmPluginInfo waitForAmPluginInfo(int timeoutMs)
-      throws InterruptedException, TimeoutException {
+  @VisibleForTesting
+  WmTezSession(String sessionId, Manager testParent,
+      SessionExpirationTracker expiration, HiveConf conf) {
+    super(sessionId, testParent, expiration, conf);
+    wmParent = null;
+  }
+
+  public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
+      int timeoutMs, ScheduledExecutorService timeoutPool) {
+    SettableFuture<WmTezSession> future = SettableFuture.create();
     synchronized (amPluginInfoLock) {
-      if (amPluginInfo == null) {
-        amPluginInfoLock.wait(timeoutMs);
-        if (amPluginInfo == null) {
-          throw new TimeoutException("No plugin information for " + getSessionId());
-        }
+      if (amPluginInfo != null) {
+        future.set(this);
+        return future;
       }
-      return amPluginInfo;
+      if (amRegistryFuture != null) {
+        // We don't need this for now, so do not support it.
+        future.setException(new RuntimeException("Multiple waits are not suported"));
+        return future;
+      }
+      amRegistryFuture = future;
+      if (timeoutMs <= 0) return future;
+      // TODO: replace with withTimeout after we get the relevant guava upgrade.
+      this.timeoutTimer = timeoutPool.schedule(
+          new TimeoutRunnable(), timeoutMs, TimeUnit.MILLISECONDS);
     }
+    return future;
   }
 
+
   @Override
   void updateFromRegistry(TezAmInstance si) {
+    AmPluginInfo info = new AmPluginInfo(si.getHost(), si.getPluginPort(),
+        si.getPluginToken(), si.getPluginTokenJobId());
     synchronized (amPluginInfoLock) {
-      this.amPluginInfo = new AmPluginInfo(si.getHost(), si.getPluginPort(),
-          si.getPluginToken(), si.getPluginTokenJobId());
-      amPluginInfoLock.notifyAll();
+      this.amPluginInfo = info;
+      if (amRegistryFuture != null) {
+        amRegistryFuture.set(this);
+        amRegistryFuture = null;
+      }
+      if (timeoutTimer != null) {
+        timeoutTimer.cancel(true);
+        timeoutTimer = null;
+      }
     }
   }
 
+  @Override
   public AmPluginInfo getAmPluginInfo() {
     return amPluginInfo; // Only has final fields, no artifacts from the absence of sync.
   }
@@ -85,6 +123,11 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     this.clusterFraction = fraction;
   }
 
+  void clearWm() {
+    this.poolName = null;
+    this.clusterFraction = 0f;
+  }
+
   double getClusterFraction() {
     return this.clusterFraction;
   }
@@ -118,4 +161,42 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       return (actualState.sent == actualState.target);
     }
   }
+
+  public void handleUpdateError() {
+    wmParent.addUpdateError(this);
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this;
+  }
+
+  boolean isIrrelevantForWm() {
+    return killReason != null;
+  }
+
+  String getReasonForKill() {
+    return killReason;
+  }
+
+  void setIsIrrelevantForWm(String killReason) {
+    this.killReason = killReason;
+  }
+
+  private final class TimeoutRunnable implements Runnable {
+    @Override
+    public void run() {
+      synchronized (amPluginInfoLock) {
+        timeoutTimer = null;
+        if (amRegistryFuture == null || amRegistryFuture.isDone()) return;
+        amRegistryFuture.cancel(true);
+        amRegistryFuture = null;
+      }
+    }
+  }
 }