You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2023/08/11 07:04:02 UTC

[hive] branch branch-3 updated: HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 194420866bb HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)
194420866bb is described below

commit 194420866bb631a84c116ac218201b55a4269100
Author: Aman Raj <10...@users.noreply.github.com>
AuthorDate: Fri Aug 11 12:33:55 2023 +0530

    HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
    Closes (#4521)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  12 ++
 .../ql/exec/tez/KillMoveTriggerActionHandler.java  |   6 +-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java      |  12 ++
 .../hadoop/hive/ql/exec/tez/WorkloadManager.java   | 165 ++++++++++++++++++---
 .../hive/ql/exec/tez/TestWorkloadManager.java      | 159 ++++++++++++++++++++
 5 files changed, 333 insertions(+), 21 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 96f44fae490..f9a47324473 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3138,6 +3138,18 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.SECONDS),
         "The timeout for AM registry registration, after which (on attempting to use the\n" +
         "session), we kill it and try to get another one."),
+    HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+        "Determines behavior of the wm move trigger when destination pool is full.\n" +
+        "If true, the query will run in source pool as long as possible if destination pool is full;\n" +
+        "if false, the query will be killed if destination pool is full."),
+    HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout", "3600",
+        new TimeValidator(TimeUnit.SECONDS),
+        "The amount of time a delayed move is allowed to run in the source pool,\n" +
+        "when a delayed move session times out, the session is moved to the destination pool.\n" +
+        "A value of 0 indicates no timeout"),
+    HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL("hive.server2.wm.delayed.move.validator.interval", "60",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Interval for checking for expired delayed moves."),
     HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
         "A list of comma separated values corresponding to YARN queues of the same name.\n" +
         "When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
index b16f1c30a07..5eb1b69ede5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -47,8 +47,10 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler<WmTezS
           break;
         case MOVE_TO_POOL:
           String destPoolName = entry.getValue().getAction().getPoolName();
-          Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
-          moveFutures.put(wmTezSession, moveFuture);
+          if (!wmTezSession.isDelayedMove()) {
+            Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+            moveFutures.put(wmTezSession, moveFuture);
+          }
           break;
         default:
           throw new RuntimeException("Unsupported action: " + entry.getValue());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index fa2b02e5913..6004d712c4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -55,6 +55,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   @JsonProperty("queryId")
   private String queryId;
   private SettableFuture<Boolean> returnFuture = null;
+  private boolean isDelayedMove;
 
   private final WorkloadManager wmParent;
 
@@ -72,6 +73,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       SessionExpirationTracker expiration, HiveConf conf) {
     super(sessionId, parent, expiration, conf);
     wmParent = parent;
+    isDelayedMove = false;
   }
 
   @VisibleForTesting
@@ -79,6 +81,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
       SessionExpirationTracker expiration, HiveConf conf) {
     super(sessionId, testParent, expiration, conf);
     wmParent = null;
+    isDelayedMove = false;
   }
 
   public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
@@ -168,6 +171,14 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     return poolName;
   }
 
+  public void setDelayedMove(boolean isDelayedMove) {
+    this.isDelayedMove = isDelayedMove;
+  }
+
+  public boolean isDelayedMove() {
+    return isDelayedMove;
+  }
+
   void setClusterFraction(double fraction) {
     this.clusterFraction = fraction;
   }
@@ -175,6 +186,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   void clearWm() {
     this.poolName = null;
     this.clusterFraction = null;
+    this.isDelayedMove = false;
   }
 
   public boolean hasClusterFraction() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index a8653c6e85d..e3976aca1a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -161,6 +161,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   private final ExecutorService workPool;
   /** Used to schedule timeouts for some async operations. */
   private final ScheduledExecutorService timeoutPool;
+  /** To retry delayed moves and check for expires delayed moves at specified intervals **/
+  private final Thread delayedMoveThread;
+  private final int delayedMoveTimeOutSec;
+  private final int delayedMoveValidationIntervalSec;
 
   private LlapPluginEndpointClientImpl amComm;
 
@@ -235,6 +239,23 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     wmThread.setDaemon(true);
     wmThread.start();
 
+    delayedMoveTimeOutSec = (int) HiveConf.getTimeVar(conf,
+        ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT, TimeUnit.SECONDS);
+    delayedMoveValidationIntervalSec = (int) HiveConf.getTimeVar(conf,
+        ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL, TimeUnit.SECONDS);
+
+    if ((HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) &&
+        (delayedMoveTimeOutSec > 0)) {
+      delayedMoveThread = new Thread(() -> runDelayedMoveThread(), "Workload management delayed move");
+      delayedMoveThread.setDaemon(true);
+      LOG.info("Starting delayed move timeout validator with interval: {} s; " +
+              "delayed move timeout is set to : {} s",
+               delayedMoveValidationIntervalSec, delayedMoveTimeOutSec);
+      delayedMoveThread.start();
+    } else {
+      delayedMoveThread = null;
+    }
+
     updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied.
 
     objectMapper = new ObjectMapper();
@@ -292,6 +313,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (wmThread != null) {
       wmThread.interrupt();
     }
+
+    if (delayedMoveThread != null) {
+      delayedMoveThread.interrupt();
+    }
+
     if (amComm != null) {
       amComm.stop();
     }
@@ -348,11 +374,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private final WmTezSession srcSession;
     private final String destPool;
     private final SettableFuture<Boolean> future;
+    private long startTime;
 
     public MoveSession(final WmTezSession srcSession, final String destPool) {
       this.srcSession = srcSession;
       this.destPool = destPool;
       this.future = SettableFuture.create();
+      this.startTime = System.currentTimeMillis();
     }
 
     @Override
@@ -416,6 +444,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private void runDelayedMoveThread() {
+    while (true) {
+      try {
+        Thread.sleep(delayedMoveValidationIntervalSec * 1000);
+        currentLock.lock();
+        LOG.info("Retry delayed moves and check for expired delayed moves.");
+        notifyWmThreadUnderLock();
+      } catch (InterruptedException ex) {
+        LOG.warn("WM Delayed Move thread was interrupted and will now exit");
+        return;
+      } finally {
+        currentLock.unlock();
+      }
+    }
+  }
+
   private void scheduleWork(WmThreadSyncWork context) {
     // Do the work that cannot be done via async calls.
 
@@ -631,8 +675,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
     // as possible
     Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
+    boolean convertToDelayedMove = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE);
     for (MoveSession moveSession : e.moveSessions) {
-      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, recordMoveEvents);
+      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse,
+          recordMoveEvents, convertToDelayedMove);
     }
     e.moveSessions.clear();
 
@@ -648,7 +694,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
     e.toReuse.clear();
 
-    // 9. Resolve all the kill query requests in flight. Nothing below can affect them.
+    // 9. If delayed move is set to true, process the "delayed moves" now.
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+      for (String poolName : pools.keySet()) {
+        processDelayedMovesForPool(poolName, poolsToRedistribute, recordMoveEvents, syncWork, e.toReuse);
+      }
+    }
+
+    // 10. Resolve all the kill query requests in flight. Nothing below can affect them.
     Iterator<KillQueryContext> iter = killQueryInProgress.values().iterator();
     while (iter.hasNext()) {
       KillQueryContext ctx = iter.next();
@@ -684,14 +737,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
     }
 
-    // 10. If there was a cluster state change, make sure we redistribute all the pools.
+    // 11. If there was a cluster state change, make sure we redistribute all the pools.
     if (e.hasClusterStateChanged) {
       LOG.info("Processing a cluster state change");
       poolsToRedistribute.addAll(pools.keySet());
       e.hasClusterStateChanged = false;
     }
 
-    // 11. Finally, for all the pools that have changes, promote queued queries and rebalance.
+    // 12. Finally, for all the pools that have changes, promote queued queries and rebalance.
     for (String poolName : poolsToRedistribute) {
       if (LOG.isDebugEnabled()) {
         LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
@@ -699,19 +752,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
     }
 
-    // 12. Save state for future iterations.
+    // 13. Save state for future iterations.
     for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
       if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
         LOG.error("One query killed several times - internal error {}", killCtx.session);
       }
     }
 
-    // 13. To record move events, we need to cluster fraction updates that happens at step 11.
+    // 14. To record move events, we need to cluster fraction updates that happens at step 12.
     for (Map.Entry<WmTezSession, WmEvent> entry : recordMoveEvents.entrySet()) {
       entry.getValue().endEvent(entry.getKey());
     }
 
-    // 14. Give our final state to UI/API requests if any.
+    // 15. Give our final state to UI/API requests if any.
     if (e.dumpStateFuture != null) {
       List<String> result = new ArrayList<>();
       result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -721,8 +774,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       e.dumpStateFuture.set(result);
       e.dumpStateFuture = null;
     }
-    
-    // 15. Notify tests and global async ops.
+
+    // 16. Notify tests and global async ops.
     if (e.testEvent != null) {
       e.testEvent.set(true);
       e.testEvent = null;
@@ -760,36 +813,63 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
-    final WmThreadSyncWork syncWork,
-    final HashSet<String> poolsToRedistribute,
-    final Map<WmTezSession, GetRequest> toReuse,
-    final Map<WmTezSession, WmEvent> recordMoveEvents) {
+  private static enum MoveSessionResult {
+    OK, // Normal case - the session was moved.
+    KILLED, // Killed because destination pool was full and delayed move is false.
+    CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's delayed moves as the dest. pool was full
+    // and delayed move is true.
+    ERROR
+  }
+
+  private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession moveSession,
+      final WmThreadSyncWork syncWork,
+      final HashSet<String> poolsToRedistribute,
+      final Map<WmTezSession, GetRequest> toReuse,
+      final Map<WmTezSession, WmEvent> recordMoveEvents,
+      final boolean convertToDelayedMove) {
     String destPoolName = moveSession.destPool;
-    LOG.info("Handling move session event: {}", moveSession);
+    LOG.info("Handling move session event: {}, Convert to Delayed Move: {}", moveSession, convertToDelayedMove);
     if (validMove(moveSession.srcSession, destPoolName)) {
+      String srcPoolName = moveSession.srcSession.getPoolName();
+      PoolState srcPool = pools.get(srcPoolName);
+      boolean capacityAvailableInDest = capacityAvailable(destPoolName);
+      // If delayed move is set to true and if destination pool doesn't have enough capacity, don't kill the query.
+      // Let the query run in source pool. Add the session to the source pool's delayed move sessions.
+      if (convertToDelayedMove && !capacityAvailableInDest) {
+        srcPool.delayedMoveSessions.add(moveSession);
+        moveSession.srcSession.setDelayedMove(true);
+        LOG.info("Converting Move: {} to a delayed move. Since destination pool {} is full, running in source pool {}"
+            + " as long as possible.", moveSession, destPoolName, srcPoolName);
+        moveSession.future.set(false);
+        return MoveSessionResult.CONVERTED_TO_DELAYED_MOVE;
+      }
       WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
       // remove from src pool
+      if (moveSession.srcSession.isDelayedMove()) {
+        moveSession.srcSession.setDelayedMove(false);
+      }
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
           moveSession.srcSession, poolsToRedistribute, true, true);
       if (rr == RemoveSessionResult.OK) {
         // check if there is capacity in dest pool, if so move else kill the session
-        if (capacityAvailable(destPoolName)) {
+        if (capacityAvailableInDest) {
           // add to destination pool
           Boolean added = checkAndAddSessionToAnotherPool(
               moveSession.srcSession, destPoolName, poolsToRedistribute);
           if (added != null && added) {
             moveSession.future.set(true);
             recordMoveEvents.put(moveSession.srcSession, moveEvent);
-            return;
+            return MoveSessionResult.OK;
           } else {
             LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
           }
         } else {
           WmTezSession session = moveSession.srcSession;
           KillQueryContext killQueryContext = new KillQueryContext(session, "Destination pool " + destPoolName +
-            " is full. Killing query.");
+             " is full. Killing query.");
           resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
+          moveSession.future.set(false);
+          return MoveSessionResult.KILLED;
         }
       } else {
         LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -797,8 +877,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     } else {
       LOG.error("Validation failed for move session: {}. Invalid move or session/pool got removed.", moveSession);
     }
-
     moveSession.future.set(false);
+    return MoveSessionResult.ERROR;
   }
 
   private Boolean capacityAvailable(final String destPoolName) {
@@ -816,6 +896,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       !srcSession.getPoolName().equalsIgnoreCase(destPool);
   }
 
+  private boolean validDelayedMove(final MoveSession moveSession, final PoolState pool, final String poolName) {
+    return  validMove(moveSession.srcSession, moveSession.destPool) &&
+        moveSession.srcSession.isDelayedMove() &&
+        moveSession.srcSession.getPoolName().equalsIgnoreCase(poolName) &&
+        pool.getSessions().contains(moveSession.srcSession);
+  }
+
   // ========= Master thread methods
 
   private void handleInitResultOnMasterThread(
@@ -1218,6 +1305,44 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private void processDelayedMovesForPool(final String poolName, final HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent> recordMoveEvents,
+      WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+    long currentTime = System.currentTimeMillis();
+    PoolState pool = pools.get(poolName);
+    int movedCount = 0;
+    int queueSize = pool.queue.size();
+    int remainingCapacity = pool.queryParallelism - pool.getTotalActiveSessions();
+    int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize - remainingCapacity) : 0;
+    Iterator<MoveSession> iter = pool.delayedMoveSessions.iterator();
+    while (iter.hasNext()) {
+      MoveSession moveSession = iter.next();
+      MoveSessionResult result;
+
+      //Discard the delayed move if invalid
+      if (!validDelayedMove(moveSession, pool, poolName)) {
+        iter.remove();
+        continue;
+      }
+
+      // Process the delayed move if
+      // 1. The delayed move has timed out or
+      // 2. The destination pool has freed up or
+      // 3. If the source pool has incoming requests and we need to free up capacity in the source pool
+      // to accommodate these requests.
+      if ((movedCount < delayedMovesToProcess) || (capacityAvailable(moveSession.destPool))
+          || ((delayedMoveTimeOutSec > 0) && ((currentTime - moveSession.startTime) >= delayedMoveTimeOutSec * 1000))){
+        LOG.info("Processing delayed move {} for pool {}", moveSession, poolName);
+        result = handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, toReuse, recordMoveEvents,
+            false);
+        LOG.info("Result of processing delayed move {} for pool {}: {}", moveSession, poolName, result);
+        iter.remove();
+        if ((result == MoveSessionResult.OK) || (result == MoveSessionResult.KILLED)) {
+          movedCount++;
+        }
+      }
+    }
+  }
+
   private void returnSessionOnFailedReuse(
       GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
     WmTezSession session = req.sessionToReuse;
@@ -1726,6 +1851,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // Note: the list is expected to be a few items; if it's longer we may want an IHM.
     private final LinkedList<WmTezSession> sessions = new LinkedList<>();
     private final LinkedList<GetRequest> queue = new LinkedList<>();
+    private final LinkedList<MoveSession> delayedMoveSessions = new LinkedList<MoveSession>();
     private final WmPoolMetrics metrics;
 
     private final String fullName;
@@ -1854,6 +1980,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         IdentityHashMap<WmTezSession, GetRequest> toReuse,
         WmThreadSyncWork syncWork) {
       int totalCount = sessions.size() + initializingSessions.size();
+      delayedMoveSessions.clear();
       for (WmTezSession sessionToKill : sessions) {
         resetRemovedSessionToKill(syncWork.toKillQuery,
           new KillQueryContext(sessionToKill, killReason), toReuse);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 6e15b2c0070..7f417627476 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -1102,6 +1102,156 @@ public class TestWorkloadManager {
     assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
   }
 
+  @Test(timeout=10000)
+  public void testDelayedMoveSessions() throws Exception {
+    final HiveConf conf = createConfForDelayedMove();
+    MockQam qam = new MockQam();
+    WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+        pool("A", 2, 0.6f), pool("B", 1, 0.4f)));
+    plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
+    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    wm.start();
+
+    WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+
+    // [A: 1, B: 0]
+    Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(1, allSessionProviders.get("A").getSessions().size());
+    assertEquals(0, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1));
+    assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.6f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA1.getPoolName());
+
+    // If dest pool has capacity, move immediately
+    // [A: 0, B: 1]
+    Future<Boolean> future = wm.applyMoveSessionAsync(sessionA1, "B");
+    assertNotNull(future.get());
+    assertTrue(future.get());
+    wm.addTestEvent().get();
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(0, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("B", sessionA1.getPoolName());
+
+    WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+    // [A: 1, B: 1]
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(1, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+    assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA2.getPoolName());
+    assertEquals("B", sessionA1.getPoolName());
+
+    // Dest pool is maxed out. Keep running in source pool
+    // [A: 1, B: 1]
+    future = wm.applyMoveSessionAsync(sessionA2, "B");
+    assertNotNull(future.get());
+    assertFalse(future.get());
+    wm.addTestEvent().get();
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(1, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+    assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA2.getPoolName());
+    assertEquals("B", sessionA1.getPoolName());
+
+    // A has queued requests. The new requests should get accepted. The delayed move should be killed
+    WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+    WmTezSession sessionA4 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+
+    while(sessionA2.isOpen()) {
+      Thread.sleep(100);
+    }
+    assertNull(sessionA2.getPoolName());
+    assertEquals("Destination pool B is full. Killing query.", sessionA2.getReasonForKill());
+
+    // [A: 2, B: 1]
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(2, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3));
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+    assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON);
+    assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA3.getPoolName());
+    assertEquals("A", sessionA4.getPoolName());
+    assertEquals("B", sessionA1.getPoolName());
+
+
+    // Timeout
+    // Attempt to move to pool B which is full. Keep running in source pool as a "delayed move".
+    future = wm.applyMoveSessionAsync(sessionA3, "B");
+    assertNotNull(future.get());
+    assertFalse(future.get());
+    wm.addTestEvent().get();
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertEquals(2, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+    assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA3.getPoolName());
+    assertEquals("A", sessionA4.getPoolName());
+    assertEquals("B", sessionA1.getPoolName());
+
+    // Sleep till the delayed move times out and the move is attempted again.
+    // The query should be killed during the move since destination pool B is still full.
+    Thread.sleep(2000);
+    while (sessionA3.isOpen()) {
+      Thread.sleep(1000);
+    }
+    // [A:1, B:1]
+    assertNull(sessionA3.getPoolName());
+    assertEquals("Destination pool B is full. Killing query.", sessionA3.getReasonForKill());
+    assertEquals(1, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertEquals(0.6f, sessionA4.getClusterFraction(), EPSILON);
+    assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+    assertEquals("A", sessionA4.getPoolName());
+    assertEquals("B", sessionA1.getPoolName());
+
+    // Retry
+    // Create another delayed move in A
+    future = wm.applyMoveSessionAsync(sessionA4, "B");
+    assertNotNull(future.get());
+    assertFalse(future.get());
+    wm.addTestEvent().get();
+
+    assertEquals("A", sessionA4.getPoolName());
+    assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+    assertEquals(1, allSessionProviders.get("A").getSessions().size());
+
+    // Free up pool B.
+    wm.returnAfterUse(sessionA1);
+    wm.addTestEvent().get();
+    allSessionProviders = wm.getAllSessionTriggerProviders();
+    assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1));
+    assertNull(sessionA1.getPoolName());
+
+    // The delayed move is successfully retried since destination pool has freed up.
+    // [A:0 B:1]
+    assertEquals(0, allSessionProviders.get("A").getSessions().size());
+    assertEquals(1, allSessionProviders.get("B").getSessions().size());
+    assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA4));
+    assertEquals(0.4f, sessionA4.getClusterFraction(), EPSILON);
+    assertEquals("B", sessionA4.getPoolName());
+  }
+
   @Test(timeout=10000)
   public void testAsyncSessionInitFailures() throws Exception {
     final HiveConf conf = createConf();
@@ -1245,4 +1395,13 @@ public class TestWorkloadManager {
     conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, "");
     return conf;
   }
+
+  private HiveConf createConfForDelayedMove() {
+    HiveConf conf = createConf();
+    conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE.varname, "true");
+    conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT.varname, "2");
+    conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL.varname, "1");
+    return conf;
+  }
 }
+