You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2023/08/11 07:04:02 UTC
[hive] branch branch-3 updated: HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)
This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 194420866bb HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)
194420866bb is described below
commit 194420866bb631a84c116ac218201b55a4269100
Author: Aman Raj <10...@users.noreply.github.com>
AuthorDate: Fri Aug 11 12:33:55 2023 +0530
HIVE-27538: Backport HIVE-24201: WorkloadManager can support delayed move if destination pool does not have enough sessions (#4521)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
Closes (#4521)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 12 ++
.../ql/exec/tez/KillMoveTriggerActionHandler.java | 6 +-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 12 ++
.../hadoop/hive/ql/exec/tez/WorkloadManager.java | 165 ++++++++++++++++++---
.../hive/ql/exec/tez/TestWorkloadManager.java | 159 ++++++++++++++++++++
5 files changed, 333 insertions(+), 21 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 96f44fae490..f9a47324473 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3138,6 +3138,18 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"The timeout for AM registry registration, after which (on attempting to use the\n" +
"session), we kill it and try to get another one."),
+ HIVE_SERVER2_WM_DELAYED_MOVE("hive.server2.wm.delayed.move", false,
+ "Determines behavior of the wm move trigger when destination pool is full.\n" +
+ "If true, the query will run in source pool as long as possible if destination pool is full;\n" +
+ "if false, the query will be killed if destination pool is full."),
+ HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT("hive.server2.wm.delayed.move.timeout", "3600",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The amount of time a delayed move is allowed to run in the source pool,\n" +
+ "when a delayed move session times out, the session is moved to the destination pool.\n" +
+ "A value of 0 indicates no timeout"),
+ HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL("hive.server2.wm.delayed.move.validator.interval", "60",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Interval for checking for expired delayed moves."),
HIVE_SERVER2_TEZ_DEFAULT_QUEUES("hive.server2.tez.default.queues", "",
"A list of comma separated values corresponding to YARN queues of the same name.\n" +
"When HiveServer2 is launched in Tez mode, this configuration needs to be set\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
index b16f1c30a07..5eb1b69ede5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillMoveTriggerActionHandler.java
@@ -47,8 +47,10 @@ public class KillMoveTriggerActionHandler implements TriggerActionHandler<WmTezS
break;
case MOVE_TO_POOL:
String destPoolName = entry.getValue().getAction().getPoolName();
- Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
- moveFutures.put(wmTezSession, moveFuture);
+ if (!wmTezSession.isDelayedMove()) {
+ Future<Boolean> moveFuture = wm.applyMoveSessionAsync(wmTezSession, destPoolName);
+ moveFutures.put(wmTezSession, moveFuture);
+ }
break;
default:
throw new RuntimeException("Unsupported action: " + entry.getValue());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index fa2b02e5913..6004d712c4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -55,6 +55,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
@JsonProperty("queryId")
private String queryId;
private SettableFuture<Boolean> returnFuture = null;
+ private boolean isDelayedMove;
private final WorkloadManager wmParent;
@@ -72,6 +73,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
SessionExpirationTracker expiration, HiveConf conf) {
super(sessionId, parent, expiration, conf);
wmParent = parent;
+ isDelayedMove = false;
}
@VisibleForTesting
@@ -79,6 +81,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
SessionExpirationTracker expiration, HiveConf conf) {
super(sessionId, testParent, expiration, conf);
wmParent = null;
+ isDelayedMove = false;
}
public ListenableFuture<WmTezSession> waitForAmRegistryAsync(
@@ -168,6 +171,14 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
return poolName;
}
+ public void setDelayedMove(boolean isDelayedMove) {
+ this.isDelayedMove = isDelayedMove;
+ }
+
+ public boolean isDelayedMove() {
+ return isDelayedMove;
+ }
+
void setClusterFraction(double fraction) {
this.clusterFraction = fraction;
}
@@ -175,6 +186,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
void clearWm() {
this.poolName = null;
this.clusterFraction = null;
+ this.isDelayedMove = false;
}
public boolean hasClusterFraction() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index a8653c6e85d..e3976aca1a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -161,6 +161,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final ExecutorService workPool;
/** Used to schedule timeouts for some async operations. */
private final ScheduledExecutorService timeoutPool;
+ /** To retry delayed moves and check for expires delayed moves at specified intervals **/
+ private final Thread delayedMoveThread;
+ private final int delayedMoveTimeOutSec;
+ private final int delayedMoveValidationIntervalSec;
private LlapPluginEndpointClientImpl amComm;
@@ -235,6 +239,23 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
wmThread.setDaemon(true);
wmThread.start();
+ delayedMoveTimeOutSec = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT, TimeUnit.SECONDS);
+ delayedMoveValidationIntervalSec = (int) HiveConf.getTimeVar(conf,
+ ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL, TimeUnit.SECONDS);
+
+ if ((HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) &&
+ (delayedMoveTimeOutSec > 0)) {
+ delayedMoveThread = new Thread(() -> runDelayedMoveThread(), "Workload management delayed move");
+ delayedMoveThread.setDaemon(true);
+ LOG.info("Starting delayed move timeout validator with interval: {} s; " +
+ "delayed move timeout is set to : {} s",
+ delayedMoveValidationIntervalSec, delayedMoveTimeOutSec);
+ delayedMoveThread.start();
+ } else {
+ delayedMoveThread = null;
+ }
+
updateResourcePlanAsync(plan).get(); // Wait for the initial resource plan to be applied.
objectMapper = new ObjectMapper();
@@ -292,6 +313,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (wmThread != null) {
wmThread.interrupt();
}
+
+ if (delayedMoveThread != null) {
+ delayedMoveThread.interrupt();
+ }
+
if (amComm != null) {
amComm.stop();
}
@@ -348,11 +374,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final WmTezSession srcSession;
private final String destPool;
private final SettableFuture<Boolean> future;
+ private long startTime;
public MoveSession(final WmTezSession srcSession, final String destPool) {
this.srcSession = srcSession;
this.destPool = destPool;
this.future = SettableFuture.create();
+ this.startTime = System.currentTimeMillis();
}
@Override
@@ -416,6 +444,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ private void runDelayedMoveThread() {
+ while (true) {
+ try {
+ Thread.sleep(delayedMoveValidationIntervalSec * 1000);
+ currentLock.lock();
+ LOG.info("Retry delayed moves and check for expired delayed moves.");
+ notifyWmThreadUnderLock();
+ } catch (InterruptedException ex) {
+ LOG.warn("WM Delayed Move thread was interrupted and will now exit");
+ return;
+ } finally {
+ currentLock.unlock();
+ }
+ }
+ }
+
private void scheduleWork(WmThreadSyncWork context) {
// Do the work that cannot be done via async calls.
@@ -631,8 +675,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
// as possible
Map<WmTezSession, WmEvent> recordMoveEvents = new HashMap<>();
+ boolean convertToDelayedMove = HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE);
for (MoveSession moveSession : e.moveSessions) {
- handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse, recordMoveEvents);
+ handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse,
+ recordMoveEvents, convertToDelayedMove);
}
e.moveSessions.clear();
@@ -648,7 +694,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
e.toReuse.clear();
- // 9. Resolve all the kill query requests in flight. Nothing below can affect them.
+ // 9. If delayed move is set to true, process the "delayed moves" now.
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE)) {
+ for (String poolName : pools.keySet()) {
+ processDelayedMovesForPool(poolName, poolsToRedistribute, recordMoveEvents, syncWork, e.toReuse);
+ }
+ }
+
+ // 10. Resolve all the kill query requests in flight. Nothing below can affect them.
Iterator<KillQueryContext> iter = killQueryInProgress.values().iterator();
while (iter.hasNext()) {
KillQueryContext ctx = iter.next();
@@ -684,14 +737,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- // 10. If there was a cluster state change, make sure we redistribute all the pools.
+ // 11. If there was a cluster state change, make sure we redistribute all the pools.
if (e.hasClusterStateChanged) {
LOG.info("Processing a cluster state change");
poolsToRedistribute.addAll(pools.keySet());
e.hasClusterStateChanged = false;
}
- // 11. Finally, for all the pools that have changes, promote queued queries and rebalance.
+ // 12. Finally, for all the pools that have changes, promote queued queries and rebalance.
for (String poolName : poolsToRedistribute) {
if (LOG.isDebugEnabled()) {
LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
@@ -699,19 +752,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
processPoolChangesOnMasterThread(poolName, hasRequeues, syncWork);
}
- // 12. Save state for future iterations.
+ // 13. Save state for future iterations.
for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
LOG.error("One query killed several times - internal error {}", killCtx.session);
}
}
- // 13. To record move events, we need to cluster fraction updates that happens at step 11.
+ // 14. To record move events, we need to cluster fraction updates that happens at step 12.
for (Map.Entry<WmTezSession, WmEvent> entry : recordMoveEvents.entrySet()) {
entry.getValue().endEvent(entry.getKey());
}
- // 14. Give our final state to UI/API requests if any.
+ // 15. Give our final state to UI/API requests if any.
if (e.dumpStateFuture != null) {
List<String> result = new ArrayList<>();
result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
@@ -721,8 +774,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
e.dumpStateFuture.set(result);
e.dumpStateFuture = null;
}
-
- // 15. Notify tests and global async ops.
+
+ // 16. Notify tests and global async ops.
if (e.testEvent != null) {
e.testEvent.set(true);
e.testEvent = null;
@@ -760,36 +813,63 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
- final WmThreadSyncWork syncWork,
- final HashSet<String> poolsToRedistribute,
- final Map<WmTezSession, GetRequest> toReuse,
- final Map<WmTezSession, WmEvent> recordMoveEvents) {
+ private static enum MoveSessionResult {
+ OK, // Normal case - the session was moved.
+ KILLED, // Killed because destination pool was full and delayed move is false.
+ CONVERTED_TO_DELAYED_MOVE, // the move session was added to the pool's delayed moves as the dest. pool was full
+ // and delayed move is true.
+ ERROR
+ }
+
+ private MoveSessionResult handleMoveSessionOnMasterThread(final MoveSession moveSession,
+ final WmThreadSyncWork syncWork,
+ final HashSet<String> poolsToRedistribute,
+ final Map<WmTezSession, GetRequest> toReuse,
+ final Map<WmTezSession, WmEvent> recordMoveEvents,
+ final boolean convertToDelayedMove) {
String destPoolName = moveSession.destPool;
- LOG.info("Handling move session event: {}", moveSession);
+ LOG.info("Handling move session event: {}, Convert to Delayed Move: {}", moveSession, convertToDelayedMove);
if (validMove(moveSession.srcSession, destPoolName)) {
+ String srcPoolName = moveSession.srcSession.getPoolName();
+ PoolState srcPool = pools.get(srcPoolName);
+ boolean capacityAvailableInDest = capacityAvailable(destPoolName);
+ // If delayed move is set to true and if destination pool doesn't have enough capacity, don't kill the query.
+ // Let the query run in source pool. Add the session to the source pool's delayed move sessions.
+ if (convertToDelayedMove && !capacityAvailableInDest) {
+ srcPool.delayedMoveSessions.add(moveSession);
+ moveSession.srcSession.setDelayedMove(true);
+ LOG.info("Converting Move: {} to a delayed move. Since destination pool {} is full, running in source pool {}"
+ + " as long as possible.", moveSession, destPoolName, srcPoolName);
+ moveSession.future.set(false);
+ return MoveSessionResult.CONVERTED_TO_DELAYED_MOVE;
+ }
WmEvent moveEvent = new WmEvent(WmEvent.EventType.MOVE);
// remove from src pool
+ if (moveSession.srcSession.isDelayedMove()) {
+ moveSession.srcSession.setDelayedMove(false);
+ }
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
moveSession.srcSession, poolsToRedistribute, true, true);
if (rr == RemoveSessionResult.OK) {
// check if there is capacity in dest pool, if so move else kill the session
- if (capacityAvailable(destPoolName)) {
+ if (capacityAvailableInDest) {
// add to destination pool
Boolean added = checkAndAddSessionToAnotherPool(
moveSession.srcSession, destPoolName, poolsToRedistribute);
if (added != null && added) {
moveSession.future.set(true);
recordMoveEvents.put(moveSession.srcSession, moveEvent);
- return;
+ return MoveSessionResult.OK;
} else {
LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
}
} else {
WmTezSession session = moveSession.srcSession;
KillQueryContext killQueryContext = new KillQueryContext(session, "Destination pool " + destPoolName +
- " is full. Killing query.");
+ " is full. Killing query.");
resetAndQueueKill(syncWork.toKillQuery, killQueryContext, toReuse);
+ moveSession.future.set(false);
+ return MoveSessionResult.KILLED;
}
} else {
LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -797,8 +877,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
} else {
LOG.error("Validation failed for move session: {}. Invalid move or session/pool got removed.", moveSession);
}
-
moveSession.future.set(false);
+ return MoveSessionResult.ERROR;
}
private Boolean capacityAvailable(final String destPoolName) {
@@ -816,6 +896,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
!srcSession.getPoolName().equalsIgnoreCase(destPool);
}
+ private boolean validDelayedMove(final MoveSession moveSession, final PoolState pool, final String poolName) {
+ return validMove(moveSession.srcSession, moveSession.destPool) &&
+ moveSession.srcSession.isDelayedMove() &&
+ moveSession.srcSession.getPoolName().equalsIgnoreCase(poolName) &&
+ pool.getSessions().contains(moveSession.srcSession);
+ }
+
// ========= Master thread methods
private void handleInitResultOnMasterThread(
@@ -1218,6 +1305,44 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ private void processDelayedMovesForPool(final String poolName, final HashSet<String> poolsToRedistribute, final Map<WmTezSession, WmEvent> recordMoveEvents,
+ WmThreadSyncWork syncWork, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+ long currentTime = System.currentTimeMillis();
+ PoolState pool = pools.get(poolName);
+ int movedCount = 0;
+ int queueSize = pool.queue.size();
+ int remainingCapacity = pool.queryParallelism - pool.getTotalActiveSessions();
+ int delayedMovesToProcess = (queueSize > remainingCapacity) ? (queueSize - remainingCapacity) : 0;
+ Iterator<MoveSession> iter = pool.delayedMoveSessions.iterator();
+ while (iter.hasNext()) {
+ MoveSession moveSession = iter.next();
+ MoveSessionResult result;
+
+ //Discard the delayed move if invalid
+ if (!validDelayedMove(moveSession, pool, poolName)) {
+ iter.remove();
+ continue;
+ }
+
+ // Process the delayed move if
+ // 1. The delayed move has timed out or
+ // 2. The destination pool has freed up or
+ // 3. If the source pool has incoming requests and we need to free up capacity in the source pool
+ // to accommodate these requests.
+ if ((movedCount < delayedMovesToProcess) || (capacityAvailable(moveSession.destPool))
+ || ((delayedMoveTimeOutSec > 0) && ((currentTime - moveSession.startTime) >= delayedMoveTimeOutSec * 1000))){
+ LOG.info("Processing delayed move {} for pool {}", moveSession, poolName);
+ result = handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, toReuse, recordMoveEvents,
+ false);
+ LOG.info("Result of processing delayed move {} for pool {}: {}", moveSession, poolName, result);
+ iter.remove();
+ if ((result == MoveSessionResult.OK) || (result == MoveSessionResult.KILLED)) {
+ movedCount++;
+ }
+ }
+ }
+ }
+
private void returnSessionOnFailedReuse(
GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
WmTezSession session = req.sessionToReuse;
@@ -1726,6 +1851,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Note: the list is expected to be a few items; if it's longer we may want an IHM.
private final LinkedList<WmTezSession> sessions = new LinkedList<>();
private final LinkedList<GetRequest> queue = new LinkedList<>();
+ private final LinkedList<MoveSession> delayedMoveSessions = new LinkedList<MoveSession>();
private final WmPoolMetrics metrics;
private final String fullName;
@@ -1854,6 +1980,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
IdentityHashMap<WmTezSession, GetRequest> toReuse,
WmThreadSyncWork syncWork) {
int totalCount = sessions.size() + initializingSessions.size();
+ delayedMoveSessions.clear();
for (WmTezSession sessionToKill : sessions) {
resetRemovedSessionToKill(syncWork.toKillQuery,
new KillQueryContext(sessionToKill, killReason), toReuse);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 6e15b2c0070..7f417627476 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -1102,6 +1102,156 @@ public class TestWorkloadManager {
assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
}
+ @Test(timeout=10000)
+ public void testDelayedMoveSessions() throws Exception {
+ final HiveConf conf = createConfForDelayedMove();
+ MockQam qam = new MockQam();
+ WMFullResourcePlan plan = new WMFullResourcePlan(plan(), Lists.newArrayList(
+ pool("A", 2, 0.6f), pool("B", 1, 0.4f)));
+ plan.setMappings(Lists.newArrayList(mapping("A", "A"), mapping("B", "B")));
+ final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ wm.start();
+
+ WmTezSession sessionA1 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+
+ // [A: 1, B: 0]
+ Map<String, SessionTriggerProvider> allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(0, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA1));
+ assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA1.getPoolName());
+
+ // If dest pool has capacity, move immediately
+ // [A: 0, B: 1]
+ Future<Boolean> future = wm.applyMoveSessionAsync(sessionA1, "B");
+ assertNotNull(future.get());
+ assertTrue(future.get());
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(0, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertFalse(allSessionProviders.get("A").getSessions().contains(sessionA1));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("B", sessionA1.getPoolName());
+
+ WmTezSession sessionA2 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+ // [A: 1, B: 1]
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA2.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // Dest pool is maxed out. Keep running in source pool
+ // [A: 1, B: 1]
+ future = wm.applyMoveSessionAsync(sessionA2, "B");
+ assertNotNull(future.get());
+ assertFalse(future.get());
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA2));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA2.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA2.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // A has queued requests. The new requests should get accepted. The delayed move should be killed
+ WmTezSession sessionA3 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+ WmTezSession sessionA4 = (WmTezSession) wm.getSession(null, mappingInput("A"), conf);
+
+ while(sessionA2.isOpen()) {
+ Thread.sleep(100);
+ }
+ assertNull(sessionA2.getPoolName());
+ assertEquals("Destination pool B is full. Killing query.", sessionA2.getReasonForKill());
+
+ // [A: 2, B: 1]
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(2, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3));
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+ assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA3.getPoolName());
+ assertEquals("A", sessionA4.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+
+ // Timeout
+ // Attempt to move to pool B which is full. Keep running in source pool as a "delayed move".
+ future = wm.applyMoveSessionAsync(sessionA3, "B");
+ assertNotNull(future.get());
+ assertFalse(future.get());
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertEquals(2, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA3));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.3f, sessionA3.getClusterFraction(), EPSILON);
+ assertEquals(0.3f, sessionA4.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA3.getPoolName());
+ assertEquals("A", sessionA4.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // Sleep till the delayed move times out and the move is attempted again.
+ // The query should be killed during the move since destination pool B is still full.
+ Thread.sleep(2000);
+ while (sessionA3.isOpen()) {
+ Thread.sleep(1000);
+ }
+ // [A:1, B:1]
+ assertNull(sessionA3.getPoolName());
+ assertEquals("Destination pool B is full. Killing query.", sessionA3.getReasonForKill());
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertEquals(0.6f, sessionA4.getClusterFraction(), EPSILON);
+ assertEquals(0.4f, sessionA1.getClusterFraction(), EPSILON);
+ assertEquals("A", sessionA4.getPoolName());
+ assertEquals("B", sessionA1.getPoolName());
+
+ // Retry
+ // Create another delayed move in A
+ future = wm.applyMoveSessionAsync(sessionA4, "B");
+ assertNotNull(future.get());
+ assertFalse(future.get());
+ wm.addTestEvent().get();
+
+ assertEquals("A", sessionA4.getPoolName());
+ assertTrue(allSessionProviders.get("A").getSessions().contains(sessionA4));
+ assertEquals(1, allSessionProviders.get("A").getSessions().size());
+
+ // Free up pool B.
+ wm.returnAfterUse(sessionA1);
+ wm.addTestEvent().get();
+ allSessionProviders = wm.getAllSessionTriggerProviders();
+ assertFalse(allSessionProviders.get("B").getSessions().contains(sessionA1));
+ assertNull(sessionA1.getPoolName());
+
+ // The delayed move is successfully retried since destination pool has freed up.
+ // [A:0 B:1]
+ assertEquals(0, allSessionProviders.get("A").getSessions().size());
+ assertEquals(1, allSessionProviders.get("B").getSessions().size());
+ assertTrue(allSessionProviders.get("B").getSessions().contains(sessionA4));
+ assertEquals(0.4f, sessionA4.getClusterFraction(), EPSILON);
+ assertEquals("B", sessionA4.getPoolName());
+ }
+
@Test(timeout=10000)
public void testAsyncSessionInitFailures() throws Exception {
final HiveConf conf = createConf();
@@ -1245,4 +1395,13 @@ public class TestWorkloadManager {
conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, "");
return conf;
}
+
+ private HiveConf createConfForDelayedMove() {
+ HiveConf conf = createConf();
+ conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE.varname, "true");
+ conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_TIMEOUT.varname, "2");
+ conf.set(ConfVars.HIVE_SERVER2_WM_DELAYED_MOVE_VALIDATOR_INTERVAL.varname, "1");
+ return conf;
+ }
}
+