You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/14 20:49:39 UTC

hive git commit: HIVE-17906 : use kill query mechanics to kill queries in WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 3bfcfdde0 -> 25f3c3f96


HIVE-17906 : use kill query mechanics to kill queries in WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25f3c3f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25f3c3f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25f3c3f9

Branch: refs/heads/master
Commit: 25f3c3f9673dc48b648b8c25ba28457139261c0e
Parents: 3bfcfdd
Author: sergey <se...@apache.org>
Authored: Tue Nov 14 12:44:49 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Nov 14 12:44:49 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |   2 +-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  29 +-
 .../hive/ql/exec/tez/WorkloadManager.java       | 522 ++++++++++++++-----
 .../hive/ql/exec/tez/TestWorkloadManager.java   |  11 +-
 4 files changed, 428 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 03a0682..2bded71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -459,7 +459,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
 
   @VisibleForTesting
   int getCurrentSize() {
-    poolLock.tryLock();
+    poolLock.lock();
     try {
       return pool.size();
     } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 6cf2aad..b770d71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -32,12 +32,17 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance;
 public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
   private String poolName;
   private double clusterFraction;
+  /**
+   * The reason to kill an AM. Note that this is for the entire session, not just for a query.
+   * Once set, this can never be unset because you can only kill the session once.
+   */
   private String killReason = null;
 
   private final Object amPluginInfoLock = new Object();
   private AmPluginInfo amPluginInfo = null;
   private SettableFuture<WmTezSession> amRegistryFuture = null;
   private ScheduledFuture<?> timeoutTimer = null;
+  private String queryId;
 
   private final WorkloadManager wmParent;
 
@@ -124,6 +129,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   void clearWm() {
     this.poolName = null;
     this.clusterFraction = 0f;
+    this.queryId = null;
   }
 
   double getClusterFraction() {
@@ -183,14 +189,12 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   }
 
   void setIsIrrelevantForWm(String killReason) {
+    if (killReason == null) {
+      throw new AssertionError("Cannot reset the kill reason " + this.killReason);
+    }
     this.killReason = killReason;
   }
 
-  @Override
-  public String toString() {
-    return super.toString() + ", poolName: " + poolName + ", clusterFraction: " + clusterFraction;
-  }
-
   private final class TimeoutRunnable implements Runnable {
     @Override
     public void run() {
@@ -202,4 +206,19 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       }
     }
   }
+
+  public void setQueryId(String queryId) {
+    this.queryId = queryId;
+  }
+
+  public String getQueryId() {
+    return this.queryId;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() +  ", WM state poolName=" + poolName + ", clusterFraction="
+        + clusterFraction + ", queryId=" + queryId + ", killReason=" + killReason;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index bdbcce5..0fdaf55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
 import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.KillQuery;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
 import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
@@ -67,35 +70,51 @@ import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 
-/** Workload management entry point for HS2. */
+/** Workload management entry point for HS2.
+ * Note on how this class operates.
+ * There are tons of things that could be happening in parallel that are a real pain to sync.
+ * Therefore, it uses an actor-ish model where the master thread, in processCurrentEvents method,
+ * processes a bunch of events that have accumulated since the previous iteration, repeatedly
+ * and quickly, doing physical work via async calls or via worker threads.
+ * That way the bulk of the state (pools, etc.) does not require any sync, and we mostly have
+ * a consistent view of the conflicting events when we process things. However, that also means
+ * none of that state can be accessed directly - most changes that touch pool state, or interact
+ * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse.
+ */
 public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
   implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
   private static final char POOL_SEPARATOR = '.';
   private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
 
+  // Various final services, configs, etc.
   private final HiveConf conf;
   private final TezSessionPool<WmTezSession> tezAmPool;
   private final SessionExpirationTracker expirationTracker;
   private final RestrictedConfigChecker restrictedConfig;
   private final QueryAllocationManager allocationManager;
   private final String yarnQueue;
+  private final int amRegistryTimeoutMs;
   // Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
   //       sessions, so the pool itself could internally track the sessions it gave out, since
   //       calling close on an unopened session is probably harmless.
   private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
-    new IdentityHashMap<>();
-  private final int amRegistryTimeoutMs;
-
-  // Note: pools can only be modified by the master thread.
-  private HashMap<String, PoolState> pools;
-  // Used to make sure that waiting getSessions don't block update.
-  private UserPoolMapping userPoolMapping;
-  private int totalQueryParallelism;
+      new IdentityHashMap<>();
   // We index the get requests to make sure there are no ordering artifacts when we requeue.
   private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
 
-  private PerPoolTriggerValidatorRunnable triggerValidatorRunnable;
+  // The below group of fields (pools, etc.) can only be modified by the master thread.
+  private Map<String, PoolState> pools;
+  private int totalQueryParallelism;
+  /**
+   * The queries being killed. This is used to sync between the background kill finishing and the
+   * query finishing and user returning the sessions, which can happen in separate iterations
+   * of the master thread processing, yet need to be aware of each other.
+   */
+  private Map<WmTezSession, KillQueryContext> killQueryInProgress = new IdentityHashMap<>();
+  // Used to make sure that waiting getSessions don't block update.
+  private UserPoolMapping userPoolMapping;
+  // End of master thread state
 
   // Note: we could use RW lock to allow concurrent calls for different sessions, however all
   //       those calls do is add elements to lists and maps; and we'd need to sync those separately
@@ -107,8 +126,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private final EventState one = new EventState(), two = new EventState();
   private boolean hasChanges = false;
   private EventState current = one;
+  private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+  // End sync stuff.
+
+  private PerPoolTriggerValidatorRunnable triggerValidatorRunnable;
   private Map<String, SessionTriggerProvider> perPoolProviders = new ConcurrentHashMap<>();
 
+  private SessionTriggerProvider sessionTriggerProvider;
+  private TriggerActionHandler triggerActionHandler;
+
+  // The master thread and various workers.
   /** The master thread the processes the events from EventState. */
   @VisibleForTesting
   protected final Thread wmThread;
@@ -116,7 +143,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private final ExecutorService workPool;
   /** Used to schedule timeouts for some async operations. */
   private final ScheduledExecutorService timeoutPool;
-  private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+
+  // The initial plan initalization future, to wait for the plan to apply during setup.
   private ListenableFuture<Boolean> initRpFuture;
 
   private static final FutureCallback<Object> FATAL_ERROR_CALLBACK = new FutureCallback<Object>() {
@@ -131,7 +159,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   };
 
-  // TODO: this is temporary before HiveServerEnvironment is merged.
   private static volatile WorkloadManager INSTANCE;
 
   public static WorkloadManager getInstance() {
@@ -182,7 +209,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     startTriggerValidator(triggerValidationIntervalMs);
   }
 
-  private int determineQueryParallelism(WMFullResourcePlan plan) {
+  private static int determineQueryParallelism(WMFullResourcePlan plan) {
     int result = 0;
     for (WMPool pool : plan.getPools()) {
       result += pool.getQueryParallelism();
@@ -245,7 +272,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   /** Represent a single iteration of work for the master thread. */
   private final static class EventState {
     private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet(),
-      toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+        toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+    private final Map<WmTezSession, Boolean> killQueryResults = new IdentityHashMap<>();
     private final LinkedList<SessionInitContext> initResults = new LinkedList<>();
     private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen =
       new IdentityHashMap<>();
@@ -279,8 +307,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    * (mostly opening and closing the sessions).
    */
   private final static class WmThreadSyncWork {
-    private LinkedList<WmTezSession> toRestartInUse = new LinkedList<>(),
-      toDestroyNoRestart = new LinkedList<>();
+    private List<WmTezSession> toRestartInUse = new LinkedList<>(),
+        toDestroyNoRestart = new LinkedList<>();
+    private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<>();
   }
 
   private void runWmThread() {
@@ -330,9 +359,43 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private void scheduleWork(WmThreadSyncWork context) {
     // Do the work that cannot be done via async calls.
 
-    // 1. Restart pool sessions.
+    // 1. Kill queries.
+    for (KillQueryContext killCtx : context.toKillQuery.values()) {
+      final WmTezSession toKill = killCtx.session;
+      final String reason = killCtx.reason;
+      LOG.info("Killing query for {}", toKill);
+      workPool.submit(() -> {
+        // Note: we get query ID here, rather than in the caller, where it would be more correct
+        //       because we know which exact query we intend to kill. This is valid because we
+        //       are not expecting query ID to change - we never reuse the session for which a
+        //       query is being killed until both the kill, and the user, return it.
+        String queryId = toKill.getQueryId();
+        KillQuery kq = toKill.getKillQuery();
+        if (kq != null && queryId != null) {
+          LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
+          try {
+            kq.killQuery(queryId, reason);
+            addKillQueryResult(toKill, true);
+            LOG.debug("Killed " + queryId);
+            return;
+          } catch (HiveException ex) {
+            LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+          }
+        } else {
+          LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
+        }
+        // We cannot restart in place because the user might receive a failure and return the
+        // session to the master thread without the "irrelevant" flag set. In fact, the query might
+        // have succeeded in the gap and the session might already be returned. Queue restart thru
+        // the master thread.
+        addKillQueryResult(toKill, false);
+      });
+    }
+    context.toKillQuery.clear();
+
+    // 2. Restart pool sessions.
     for (final WmTezSession toRestart : context.toRestartInUse) {
-      LOG.info("Replacing " + toRestart + " with a new session");
+      LOG.info("Replacing {} with a new session", toRestart);
       workPool.submit(() -> {
         try {
           // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
@@ -343,9 +406,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       });
     }
     context.toRestartInUse.clear();
-    // 2. Destroy the sessions that we don't need anymore.
+
+    // 3. Destroy the sessions that we don't need anymore.
     for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
-      LOG.info("Closing " + toDestroy + " without restart");
+      LOG.info("Closing {} without restart", toDestroy);
       workPool.submit(() -> {
         try {
           toDestroy.close(false);
@@ -357,6 +421,17 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     context.toDestroyNoRestart.clear();
   }
 
+  /**
+   * This is the main method of the master thread the processes one set of events.
+   * Be mindful of the fact that events can be queued while we are processing events, so
+   * in addition to making sure we keep the current set consistent (e.g. no need to handle
+   * update errors for a session that should already be destroyed), this needs to guard itself
+   * against the future iterations - e.g. what happens if we kill a query due to plan change,
+   * but the DAG finished before the kill happens and the user queues a "return" event? Etc.
+   * DO NOT block for a long time in this method.
+   * @param e Input events.
+   * @param syncWork Output tasks that cannot be called via async methods.
+   */
   private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
     // The order of processing is as follows. We'd reclaim or kill all the sessions that we can
     // reclaim from various user actions and errors, then apply the new plan if any,
@@ -374,40 +449,61 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     e.initResults.clear();
 
-    // 1. Handle sessions that are being destroyed by users. Destroy implies return.
+    // 1. Handle kill query results - part 1, just put them in place. We will resolve what
+    //    to do with the sessions after we go thru all the concurrent user actions.
+    for (Map.Entry<WmTezSession, Boolean> entry : e.killQueryResults.entrySet()) {
+      WmTezSession killQuerySession = entry.getKey();
+      boolean killResult = entry.getValue();
+      LOG.debug("Processing KillQuery {} for {}",
+          killResult ? "success" : "failure", killQuerySession);
+      // Note: do not cancel any user actions here; user actions actually interact with kills.
+      KillQueryContext killCtx = killQueryInProgress.get(killQuerySession);
+      if (killCtx == null) {
+        LOG.error("Internal error - cannot find the context for killing {}", killQuerySession);
+        continue;
+      }
+      killCtx.handleKillQueryCallback(!killResult);
+    }
+    e.killQueryResults.clear();
+
+    // 2. Handle sessions that are being destroyed by users. Destroy implies return.
     for (WmTezSession sessionToDestroy : e.toDestroy) {
       if (e.toReturn.remove(sessionToDestroy)) {
         LOG.warn("The session was both destroyed and returned by the user; destroying");
       }
       LOG.info("Destroying {}", sessionToDestroy);
-      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
-        e, sessionToDestroy, poolsToRedistribute);
-      if (shouldReturn == null || shouldReturn) {
-        // Restart if this session is still relevant, even if there's an internal error.
+      RemoveSessionResult rr = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToDestroy, poolsToRedistribute, false);
+      if (rr == RemoveSessionResult.OK || rr == RemoveSessionResult.NOT_FOUND) {
+        // Restart even if there's an internal error.
         syncWork.toRestartInUse.add(sessionToDestroy);
       }
     }
     e.toDestroy.clear();
 
-    // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
+    // 3. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
     for (WmTezSession sessionToReturn: e.toReturn) {
       LOG.info("Returning {}", sessionToReturn);
-      Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
-        e, sessionToReturn, poolsToRedistribute);
-      if (shouldReturn == null) {
-        // Restart if there's an internal error.
-        syncWork.toRestartInUse.add(sessionToReturn);
-        continue;
-      }
-      if (!shouldReturn) continue;
-      boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
-      if (!wasReturned) {
-        syncWork.toDestroyNoRestart.add(sessionToReturn);
+      RemoveSessionResult rr = handleReturnedInUseSessionOnMasterThread(
+          e, sessionToReturn, poolsToRedistribute, true);
+      switch (rr) {
+      case OK:
+        boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
+        if (!wasReturned) {
+          syncWork.toDestroyNoRestart.add(sessionToReturn);
+        }
+        break;
+      case NOT_FOUND:
+        syncWork.toRestartInUse.add(sessionToReturn); // Restart if there's an internal error.
+        break;
+      case IGNORE:
+        break;
+      default: throw new AssertionError("Unknown state " + rr);
       }
     }
     e.toReturn.clear();
 
-    // 3. Reopen is essentially just destroy + get a new session for a session in use.
+    // 4. Reopen is essentially just destroy + get a new session for a session in use.
     for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
       LOG.info("Reopening {}", entry.getKey());
       handeReopenRequestOnMasterThread(
@@ -415,14 +511,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     e.toReopen.clear();
 
-    // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
+    // 5. All the sessions in use that were not destroyed or returned with a failed update now die.
     for (WmTezSession sessionWithUpdateError : e.updateErrors) {
       LOG.info("Update failed for {}", sessionWithUpdateError);
       handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
     }
     e.updateErrors.clear();
 
-    // 5. Now apply a resource plan if any. This is expected to be pretty rare.
+    // 6. Now apply a resource plan if any. This is expected to be pretty rare.
     boolean hasRequeues = false;
     if (e.resourcePlanToApply != null) {
       LOG.info("Applying new resource plan");
@@ -432,21 +528,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     e.resourcePlanToApply = null;
 
-    // 6. Handle any move session requests. The way move session works right now is
+    // 7. Handle any move session requests. The way move session works right now is
     // a) sessions get moved to destination pool if there is capacity in destination pool
     // b) if there is no capacity in destination pool, the session gets killed (since we cannot pause a query)
     // TODO: in future this the process of killing can be delayed until the point where a session is actually required.
     // We could consider delaying the move (when destination capacity is full) until there is claim in src pool.
     // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
     // as possible
-    if (e.moveSessions != null) {
-      for (MoveSession moveSession : e.moveSessions) {
-        handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
-      }
+    for (MoveSession moveSession : e.moveSessions) {
+      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
     }
     e.moveSessions.clear();
 
-    // 7. Handle all the get/reuse requests. We won't actually give out anything here, but merely
+    // 8. Handle all the get/reuse requests. We won't actually give out anything here, but merely
     //    map all the requests and place them in an appropriate order in pool queues. The only
     //    exception is the reuse without queue contention; can be granted immediately. If we can't
     //    reuse the session immediately, we will convert the reuse to a normal get, because we
@@ -458,14 +552,42 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     e.toReuse.clear();
 
-    // 8. If there was a cluster state change, make sure we redistribute all the pools.
+    // 9. Resolve all the kill query requests in flight. Nothing below can affect them.
+    Iterator<KillQueryContext> iter = killQueryInProgress.values().iterator();
+    while (iter.hasNext()) {
+      KillQueryContext ctx = iter.next();
+      KillQueryResult kr = ctx.process();
+      switch (kr) {
+      case IN_PROGRESS: continue; // Either the user or the kill is not done yet.
+      case OK: {
+        iter.remove();
+        LOG.debug("Kill query succeeded; returning to the pool: {}", ctx.session);
+        if (!tezAmPool.returnSessionAsync(ctx.session)) {
+          syncWork.toDestroyNoRestart.add(ctx.session);
+        }
+        break;
+      }
+      case RESTART_REQUIRED: {
+        iter.remove();
+        LOG.debug("Kill query failed; restarting: {}", ctx.session);
+        // Note: we assume here the session, before we resolve killQuery result here, is still
+        //       "in use". That is because all the user ops above like return, reopen, etc.
+        //       don't actually return/reopen/... when kill query is in progress.
+        syncWork.toRestartInUse.add(ctx.session);
+        break;
+      }
+      default: throw new AssertionError("Unknown state " + kr);
+      }
+    }
+
+    // 10. If there was a cluster state change, make sure we redistribute all the pools.
     if (e.hasClusterStateChanged) {
       LOG.info("Processing a cluster state change");
       poolsToRedistribute.addAll(pools.keySet());
       e.hasClusterStateChanged = false;
     }
 
-    // 9. Finally, for all the pools that have changes, promote queued queries and rebalance.
+    // 11. Finally, for all the pools that have changes, promote queued queries and rebalance.
     for (String poolName : poolsToRedistribute) {
       if (LOG.isDebugEnabled()) {
         LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
@@ -473,7 +595,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
     }
 
-    // 10. Notify tests and global async ops.
+    // 12. Save state for future iterations.
+    for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
+      if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
+        LOG.error("One query killed several times - internal error {}", killCtx.session);
+      }
+    }
+
+    // 13. Notify tests and global async ops.
     if (e.testEvent != null) {
       e.testEvent.set(true);
       e.testEvent = null;
@@ -491,8 +620,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     LOG.info("Handling move session event: {}", moveSession);
     if (validMove(moveSession.srcSession, destPoolName)) {
       // remove from src pool
-      Boolean removed = checkAndRemoveSessionFromItsPool(moveSession.srcSession, poolsToRedistribute);
-      if (removed != null && removed) {
+      RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+          moveSession.srcSession, poolsToRedistribute, true);
+      if (rr == RemoveSessionResult.OK) {
         // check if there is capacity in dest pool, if so move else kill the session
         if (capacityAvailable(destPoolName)) {
           // add to destination pool
@@ -505,7 +635,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           }
         } else {
           moveSession.srcSession.clearWm();
-          moveSession.srcSession.setIsIrrelevantForWm("Destination pool " + destPoolName + " is full. Killing query.");
+          moveSession.srcSession.setIsIrrelevantForWm("Destination pool "
+          + destPoolName + " is full. Killing query.");
           syncWork.toRestartInUse.add(moveSession.srcSession);
         }
       } else {
@@ -575,8 +706,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  private Boolean handleReturnedInUseSessionOnMasterThread(
-    EventState e, WmTezSession session, HashSet<String> poolsToRedistribute) {
+  private RemoveSessionResult handleReturnedInUseSessionOnMasterThread(
+      EventState e, WmTezSession session, HashSet<String> poolsToRedistribute, boolean isReturn) {
     // This handles the common logic for destroy and return - everything except
     // the invalid combination of destroy and return themselves, as well as the actual
     // statement that destroys or returns it.
@@ -591,7 +722,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (reuseRequest != null) {
       reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
     }
-    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+    return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
   }
 
   private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
@@ -608,26 +739,32 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // anything. Instead, we will try to give out an existing session from the pool, and restart
     // the problematic one in background.
     String poolName = session.getPoolName();
-    Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
-    // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
-    // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
-    if (isRemoved == null) {
-      future.setException(new RuntimeException("Reopen failed due to an internal error"));
+    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false);
+    switch (rr) {
+    case OK:
+      // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
+      PoolState pool = pools.get(poolName);
+      SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId());
+      // We have just removed the session from the same pool, so don't check concurrency here.
+      pool.initializingSessions.add(sw);
+      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+      Futures.addCallback(getFuture, sw);
       syncWork.toRestartInUse.add(session);
       return;
-    } else if (!isRemoved) {
+    case IGNORE:
+      // Reopen implies the use of the reopened session for the same query that we gave it out
+      // for; so, as we would have failed an active query, fail the user before it's started.
       future.setException(new RuntimeException("WM killed this session during reopen: "
-        + session.getReasonForKill()));
-      return; // No longer relevant for WM - bail.
+          + session.getReasonForKill()));
+      return; // No longer relevant for WM.
+    case NOT_FOUND:
+      // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
+      // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
+      future.setException(new RuntimeException("Reopen failed due to an internal error"));
+      syncWork.toRestartInUse.add(session);
+      return;
+    default: throw new AssertionError("Unknown state " + rr);
     }
-    // If pool didn't exist, removeSessionFromItsPool would have returned null.
-    PoolState pool = pools.get(poolName);
-    SessionInitContext sw = new SessionInitContext(future, poolName);
-    // We have just removed the session from the same pool, so don't check concurrency here.
-    pool.initializingSessions.add(sw);
-    ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
-    Futures.addCallback(getFuture, sw);
-    syncWork.toRestartInUse.add(session);
   }
 
   private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError,
@@ -640,17 +777,23 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // TODO: we should communicate this to the user more explicitly (use kill query API, or
     //       add an option for bg kill checking to TezTask/monitor?
     // We are assuming the update-error AM is bad and just try to kill it.
-    Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute);
-    if (isRemoved != null && !isRemoved) {
-      // An update error for some session that was actually already killed by us.
-      return;
+    RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+        sessionWithUpdateError, poolsToRedistribute, null);
+    switch (rr) {
+    case OK:
+    case NOT_FOUND:
+      // Regardless whether it was removed successfully or after failing to remove, restart it.
+      // Since we just restart this from under the user, mark it so we handle it properly when
+      // the user tries to actually use this session and fails, proceeding to return/destroy it.
+      sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
+      // We assume AM might be bad so we will not try to kill the query here; just scrap the AM.
+      syncWork.toRestartInUse.add(sessionWithUpdateError);
+      break;
+    case IGNORE:
+      return; // An update error for some session that was actually already killed by us.
+    default:
+      throw new AssertionError("Unknown state " + rr);
     }
-    // Regardless whether it was removed successfully or after failing to remove, restart it.
-    // Since we just restart this from under the user, mark it so we handle it properly when
-    // the user tries to actually use this session and fails, proceeding to return/destroy it.
-    // TODO: propagate this error to TezJobMonitor somehow, after we add the use of KillQuery.
-    sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
-    syncWork.toRestartInUse.add(sessionWithUpdateError);
   }
 
   private void applyNewResourcePlanOnMasterThread(
@@ -661,7 +804,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     //       that fractions or query parallelism add up, etc.
     this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(),
         e.resourcePlanToApply.getPlan().getDefaultPoolPath());
-    HashMap<String, PoolState> oldPools = pools;
+    Map<String, PoolState> oldPools = pools;
     pools = new HashMap<>();
 
     // For simplicity, to always have parents while storing pools in a flat structure, we'll
@@ -692,7 +835,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           state = new PoolState(fullName, qp, fraction);
         } else {
           // This will also take care of the queries if query parallelism changed.
-          state.update(qp, fraction, syncWork.toRestartInUse, e);
+          state.update(qp, fraction, syncWork.toKillQuery, e);
           poolsToRedistribute.add(fullName);
         }
         state.setTriggers(new LinkedList<Trigger>());
@@ -719,7 +862,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (oldPools != null && !oldPools.isEmpty()) {
       // Looks like some pools were removed; kill running queries, re-queue the queued ones.
       for (PoolState oldPool : oldPools.values()) {
-        oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
+        oldPool.destroy(syncWork.toKillQuery, e.getRequests, e.toReuse);
       }
     }
 
@@ -728,17 +871,34 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     this.totalQueryParallelism = totalQueryParallelism;
     if (deltaSessions == 0) return; // Nothing to do.
     if (deltaSessions < 0) {
-      // First, see if we have unused sessions that we were planning to restart; get rid of those.
-      int toTransfer = Math.min(-deltaSessions, syncWork.toRestartInUse.size());
-      for (int i = 0; i < toTransfer; ++i) {
-        syncWork.toDestroyNoRestart.add(syncWork.toRestartInUse.pollFirst());
-      }
-      deltaSessions += toTransfer;
+      // First, see if we have sessions that we were planning to restart/kill; get rid of those.
+      deltaSessions = transferSessionsToDestroy(
+          syncWork.toKillQuery.keySet(), syncWork.toDestroyNoRestart, deltaSessions);
+      deltaSessions = transferSessionsToDestroy(
+          syncWork.toRestartInUse, syncWork.toDestroyNoRestart, deltaSessions);
     }
     if (deltaSessions != 0) {
-      failOnFutureFailure(tezAmPool.resizeAsync(
-        deltaSessions, syncWork.toDestroyNoRestart));
+      failOnFutureFailure(tezAmPool.resizeAsync(deltaSessions, syncWork.toDestroyNoRestart));
+    }
+  }
+
+  private static int transferSessionsToDestroy(Collection<WmTezSession> source,
+      List<WmTezSession> toDestroy, int deltaSessions) {
+    // We were going to kill some queries and reuse the sessions, or maybe restart and put the new
+    // ones back into the AM pool. However, the AM pool has shrunk, so we will close them instead.
+    if (deltaSessions >= 0) return deltaSessions;
+    int toTransfer = Math.min(-deltaSessions, source.size());
+    Iterator<WmTezSession> iter = source.iterator();
+    for (int i = 0; i < toTransfer; ++i) {
+      WmTezSession session = iter.next();
+      LOG.debug("Will destroy {} instead of restarting", session);
+      if (!session.isIrrelevantForWm()) {
+        session.setIsIrrelevantForWm("Killed due to workload management plan change");
+      }
+      toDestroy.add(session);
+      iter.remove();
     }
+    return deltaSessions + toTransfer;
   }
 
   @SuppressWarnings("unchecked")
@@ -768,9 +928,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // Kills that could have removed it must have cleared sessionToReuse.
       String oldPoolName = req.sessionToReuse.getPoolName();
       oldPool = pools.get(oldPoolName);
-      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
-      if (isRemoved == null || !isRemoved) {
-        // This is probably an internal error... abandon the reuse attempt.
+      RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+          req.sessionToReuse, poolsToRedistribute, true);
+      if (rr != RemoveSessionResult.OK) {
+        // Abandon the reuse attempt.
         returnSessionOnFailedReuse(req, syncWork, null);
         req.sessionToReuse = null;
       } else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
@@ -786,6 +947,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // If we can immediately reuse a session, there's nothing to wait for - just return.
       req.sessionToReuse.setPoolName(poolName);
       req.sessionToReuse.setQueueName(yarnQueue);
+      req.sessionToReuse.setQueryId(req.queryId);
       pool.sessions.add(req.sessionToReuse);
       if (pool != oldPool) {
         poolsToRedistribute.add(poolName);
@@ -822,7 +984,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // Note that in theory, we are guaranteed to have a session waiting for us here, but
       // the expiration, failures, etc. may cause one to be missing pending restart.
       // See SessionInitContext javadoc.
-      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName);
+      SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId);
       ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
       Futures.addCallback(getFuture, sw);
       // It is possible that all the async methods returned on the same thread because the
@@ -847,32 +1009,54 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   }
 
   private void returnSessionOnFailedReuse(
-    GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
-    if (req.sessionToReuse == null) return;
+      GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+    WmTezSession session = req.sessionToReuse;
+    if (session == null) return;
+    req.sessionToReuse = null;
     if (poolsToRedistribute != null) {
-      Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
-      // The session cannot have been killed; this happens after all the kills in the current
-      // iteration, so we would have cleared sessionToReuse when killing this.
-      assert isRemoved == null || isRemoved;
+      RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+          session, poolsToRedistribute, true);
+      // The session cannot have been killed just now; this happens after all the kills in
+      // the current iteration, so we would have cleared sessionToReuse when killing this.
+      boolean isOk = (rr == RemoveSessionResult.OK);
+      assert isOk || rr == RemoveSessionResult.IGNORE;
+      if (!isOk) return;
     }
-    if (!tezAmPool.returnSessionAsync(req.sessionToReuse)) {
-      syncWork.toDestroyNoRestart.add(req.sessionToReuse);
+    if (!tezAmPool.returnSessionAsync(session)) {
+      syncWork.toDestroyNoRestart.add(session);
     }
-    req.sessionToReuse = null;
+  }
+
+  /** The result of trying to remove a presumably-active session from a pool on a user request. */
+  private static enum RemoveSessionResult {
+    OK, // Normal case - an active session was removed from the pool.
+    IGNORE, // Session was restarted out of bounds, any user-side handling should be ignored.
+            // Or, session is being killed, need to coordinate between that and the user.
+            // These two cases don't need to be distinguished for now.
+    NOT_FOUND // The session is active but not found in the pool - internal error.
   }
 
   /**
    * Checks if the session is still relevant for WM and if yes, removes it from its thread.
+   * @param isSessionOk Whether the user thinks the session being returned in some way is ok;
+   *                    true means it is (return, reuse); false mean it isn't (reopen, destroy);
+   *                    null means this is not a user call.
    * @return true if the session was removed; false if the session was already processed by WM
    *         thread (so we are dealing with an outdated request); null if the session should be
    *         in WM but wasn't found in the requisite pool (internal error?).
    */
-  private Boolean checkAndRemoveSessionFromItsPool(
-    WmTezSession session, HashSet<String> poolsToRedistribute) {
+  private RemoveSessionResult checkAndRemoveSessionFromItsPool(
+      WmTezSession session, HashSet<String> poolsToRedistribute, Boolean isSessionOk) {
     // It is possible for some request to be queued after a main thread has decided to kill this
     // session; on the next iteration, we'd be processing that request with an irrelevant session.
     if (session.isIrrelevantForWm()) {
-      return false;
+      return RemoveSessionResult.IGNORE;
+    }
+    if (killQueryInProgress.containsKey(session)) {
+      if (isSessionOk != null) {
+        killQueryInProgress.get(session).handleUserCallback(!isSessionOk);
+      }
+      return RemoveSessionResult.IGNORE;
     }
     // If we did not kill this session we expect everything to be present.
     String poolName = session.getPoolName();
@@ -881,11 +1065,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       PoolState pool = pools.get(poolName);
       session.clearWm();
       if (pool != null && pool.sessions.remove(session)) {
-        return true;
+        return RemoveSessionResult.OK;
       }
     }
     LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
-    return null;
+    return RemoveSessionResult.NOT_FOUND;
   }
 
   private Boolean checkAndAddSessionToAnotherPool(
@@ -955,11 +1139,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private final MappingInput mappingInput;
     private final SettableFuture<WmTezSession> future;
     private WmTezSession sessionToReuse;
+    private final String queryId;
 
-    private GetRequest(MappingInput mappingInput, SettableFuture<WmTezSession> future,
-        WmTezSession sessionToReuse, long order) {
+    private GetRequest(MappingInput mappingInput, String queryId,
+        SettableFuture<WmTezSession> future,  WmTezSession sessionToReuse, long order) {
       assert mappingInput != null;
       this.mappingInput = mappingInput;
+      this.queryId = queryId;
       this.future = future;
       this.sessionToReuse = sessionToReuse;
       this.order = order;
@@ -975,10 +1161,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
     // Note: not actually used for pool sessions; verify some things like doAs are not set.
     validateConfig(conf);
+    String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
     SettableFuture<WmTezSession> future = SettableFuture.create();
     WmTezSession wmSession = checkSessionForReuse(session);
     GetRequest req = new GetRequest(
-        input, future, wmSession, getRequestVersion.incrementAndGet());
+        input, queryId, future, wmSession, getRequestVersion.incrementAndGet());
     currentLock.lock();
     try {
       current.getRequests.add(req);
@@ -1049,6 +1236,17 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private void addKillQueryResult(WmTezSession toKill, boolean success) {
+    currentLock.lock();
+    try {
+      current.killQueryResults.put(toKill, success);
+      notifyWmThreadUnderLock();
+    } finally {
+      currentLock.unlock();
+    }
+  }
+
+
   @VisibleForTesting
   /**
    * Adds a test event that's processed at the end of WM iteration.
@@ -1271,7 +1469,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     public void update(int queryParallelism, double fraction,
-      List<WmTezSession> toKill, EventState e) {
+        Map<WmTezSession, KillQueryContext> toKill, EventState e) {
       this.finalFraction = this.finalFractionRemaining = fraction;
       this.queryParallelism = queryParallelism;
       // TODO: two possible improvements
@@ -1290,8 +1488,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
     }
 
-    public void destroy(List<WmTezSession> toKill, LinkedList<GetRequest> globalQueue,
-      IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+    public void destroy(Map<WmTezSession, KillQueryContext> toKill,
+        LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
       extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
       // All the pending get requests should just be requeued elsewhere.
       // Note that we never queue session reuse so sessionToReuse would be null.
@@ -1323,10 +1521,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     private void extractAllSessionsToKill(String killReason,
-      IdentityHashMap<WmTezSession, GetRequest> toReuse, List<WmTezSession> toKill) {
+        IdentityHashMap<WmTezSession, GetRequest> toReuse,
+        Map<WmTezSession, KillQueryContext> toKill) {
       for (WmTezSession sessionToKill : sessions) {
-        resetRemovedSession(sessionToKill, killReason, toReuse);
-        toKill.add(sessionToKill);
+        resetRemovedSession(sessionToKill, toReuse);
+        toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
       }
       sessions.clear();
       for (SessionInitContext initCtx : initializingSessions) {
@@ -1336,16 +1535,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (sessionToKill == null) {
           continue; // Async op in progress; the callback will take care of this.
         }
-        resetRemovedSession(sessionToKill, killReason, toReuse);
-        toKill.add(sessionToKill);
+        resetRemovedSession(sessionToKill, toReuse);
+        toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
       }
       initializingSessions.clear();
     }
 
-    private void resetRemovedSession(WmTezSession sessionToKill, String killReason,
-      IdentityHashMap<WmTezSession, GetRequest> toReuse) {
-      assert killReason != null;
-      sessionToKill.setIsIrrelevantForWm(killReason);
+    private void resetRemovedSession(WmTezSession sessionToKill,
+        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
       sessionToKill.clearWm();
       GetRequest req = toReuse.remove(sessionToKill);
       if (req != null) {
@@ -1375,7 +1572,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    * for async session initialization, as well as parallel cancellation.
    */
   private final class SessionInitContext implements FutureCallback<WmTezSession> {
-    private final String poolName;
+    private final String poolName, queryId;
 
     private final ReentrantLock lock = new ReentrantLock();
     private WmTezSession session;
@@ -1383,10 +1580,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private SessionInitState state;
     private String cancelReason;
 
-    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName) {
+    public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId) {
       this.state = SessionInitState.GETTING;
       this.future = future;
       this.poolName = poolName;
+      this.queryId = queryId;
     }
 
     @Override
@@ -1402,6 +1600,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           assert this.state == SessionInitState.GETTING;
           session.setPoolName(poolName);
           session.setQueueName(yarnQueue);
+          session.setQueryId(queryId);
           this.session = session;
           this.state = SessionInitState.WAITING_FOR_REGISTRY;
           break;
@@ -1448,6 +1647,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
             "The query was killed by workload management: " + cancelReason));
         session.setPoolName(null);
         session.setClusterFraction(0f);
+        session.setQueryId(null);
         tezAmPool.returnSession(session);
         break;
       }
@@ -1541,6 +1741,74 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     return userPoolMapping.mapSessionToPoolName(input) != null;
   }
 
+  private static enum KillQueryResult {
+    OK,
+    RESTART_REQUIRED,
+    IN_PROGRESS
+  }
+
+  /**
+   * When we kill a query without killing a session, we need two things to come back before reuse.
+   * First of all, kill query itself should come back, and second the user should handle it
+   * and let go of the session (or, the query could finish and it could give the session back
+   * even before we try to kill the query). We also need to handle cases where the user doesn't
+   * like the session even before we kill it, or the kill fails and the user is happily computing
+   * away. This class is to collect and make sense of the state around all this.
+   */
+  private static final class KillQueryContext {
+    private final String reason;
+    private final WmTezSession session;
+    // Note: all the fields are only modified by master thread.
+    private boolean isUserDone = false, isKillDone = false,
+        hasKillFailed = false, hasUserFailed = false;
+
+    public KillQueryContext(WmTezSession session, String reason) {
+      this.session = session;
+      this.reason = reason;
+    }
+
+    private void handleKillQueryCallback(boolean hasFailed) {
+      isKillDone = true;
+      hasKillFailed = hasFailed;
+    }
+
+    private void handleUserCallback(boolean hasFailed) {
+      if (isUserDone) {
+        LOG.warn("Duplicate user call for a session being killed; ignoring");
+        return;
+      }
+      isUserDone = true;
+      hasUserFailed = hasFailed;
+    }
+
+    private KillQueryResult process() {
+      if (!isUserDone && hasKillFailed) {
+        // The user has not returned and the kill has failed.
+        // We are going to brute force kill the AM; whatever user does is now irrelevant.
+        session.setIsIrrelevantForWm(reason);
+        return KillQueryResult.RESTART_REQUIRED;
+      }
+      if (!isUserDone || !isKillDone) return KillQueryResult.IN_PROGRESS; // Someone is not done.
+      // Both user and the kill have returned.
+      if (hasUserFailed && hasKillFailed) {
+        // If the kill failed and the user also thinks the session is invalid, restart it.
+        session.setIsIrrelevantForWm(reason);
+        return KillQueryResult.RESTART_REQUIRED;
+      }
+      // Otherwise, we can reuse the session. Either the kill has failed but the user managed to
+      // return early (in fact, can it fail because the query has completed earlier?), or the user
+      // has failed because the query was killed from under it.
+      return KillQueryResult.OK;
+    }
+
+    @Override
+    public String toString() {
+      return "KillQueryContext [isUserDone=" + isUserDone + ", isKillDone=" + isKillDone
+          + ", hasKillFailed=" + hasKillFailed + ", hasUserFailed=" + hasUserFailed
+          + ", session=" + session + ", reason=" + reason + "]";
+    }
+  }
+
   @VisibleForTesting
   TezSessionPool<WmTezSession> getTezAmPool() {
     return tezAmPool;

http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index a73a24a..e220837 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -506,6 +506,8 @@ public class TestWorkloadManager {
             mapping("C", "C"), mapping("D", "D")));
     final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
+    TezSessionPool<WmTezSession> tezAmPool = wm.getTezAmPool();
+    assertEquals(6, tezAmPool.getCurrentSize());
  
     // A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
     // Total: 5/6 running.
@@ -525,6 +527,7 @@ public class TestWorkloadManager {
     checkError(error);
     assertEquals(0.3f, sessionC1.getClusterFraction(), EPSILON);
     assertEquals(0.3f, sessionD1.getClusterFraction(), EPSILON);
+    assertEquals(1, tezAmPool.getCurrentSize());
 
     // Change the resource plan - resize B and C down, D up, and remove A remapping users to B.
     // Everything will be killed in A and B, C won't change, D will start one more query from
@@ -549,14 +552,16 @@ public class TestWorkloadManager {
     assertEquals(0.3f, sessionA2.get().getClusterFraction(), EPSILON);
     assertEquals(0.2f, sessionC1.getClusterFraction(), EPSILON);
     assertEquals(0.25f, sessionD1.getClusterFraction(), EPSILON);
-
     assertKilledByWm(sessionA1);
     assertKilledByWm(sessionB1);
     assertKilledByWm(sessionB2);
+    assertEquals(0, tezAmPool.getCurrentSize());
 
     // Wait for another iteration to make sure event gets processed for D2 to receive allocation.
     sessionA2.get().returnToSessionManager();
     assertEquals(0.25f, sessionD2.get().getClusterFraction(), EPSILON);
+    // Return itself should be a no-op - the pool went from 6 to 4 with 1 session in the pool.
+
     sessionD2.get().returnToSessionManager();
     sessionC1.returnToSessionManager();
     sessionD1.returnToSessionManager();
@@ -564,8 +569,8 @@ public class TestWorkloadManager {
     // Try to "return" stuff that was killed from "under" us. Should be a no-op.
     sessionA1.returnToSessionManager();
     sessionB1.returnToSessionManager();
-    sessionB2.returnToSessionManager(); 
-    assertEquals(4, wm.getTezAmPool().getCurrentSize());
+    sessionB2.returnToSessionManager();
+    assertEquals(4, tezAmPool.getCurrentSize());
   }
 
   @Test(timeout=10000)