You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/14 20:49:39 UTC
hive git commit: HIVE-17906 : use kill query mechanics to kill
queries in WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 3bfcfdde0 -> 25f3c3f96
HIVE-17906 : use kill query mechanics to kill queries in WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25f3c3f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25f3c3f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25f3c3f9
Branch: refs/heads/master
Commit: 25f3c3f9673dc48b648b8c25ba28457139261c0e
Parents: 3bfcfdd
Author: sergey <se...@apache.org>
Authored: Tue Nov 14 12:44:49 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Nov 14 12:44:49 2017 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 2 +-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 29 +-
.../hive/ql/exec/tez/WorkloadManager.java | 522 ++++++++++++++-----
.../hive/ql/exec/tez/TestWorkloadManager.java | 11 +-
4 files changed, 428 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 03a0682..2bded71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -459,7 +459,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
@VisibleForTesting
int getCurrentSize() {
- poolLock.tryLock();
+ poolLock.lock();
try {
return pool.size();
} finally {
http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 6cf2aad..b770d71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -32,12 +32,17 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance;
public class WmTezSession extends TezSessionPoolSession implements AmPluginNode {
private String poolName;
private double clusterFraction;
+ /**
+ * The reason to kill an AM. Note that this is for the entire session, not just for a query.
+ * Once set, this can never be unset because you can only kill the session once.
+ */
private String killReason = null;
private final Object amPluginInfoLock = new Object();
private AmPluginInfo amPluginInfo = null;
private SettableFuture<WmTezSession> amRegistryFuture = null;
private ScheduledFuture<?> timeoutTimer = null;
+ private String queryId;
private final WorkloadManager wmParent;
@@ -124,6 +129,7 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
void clearWm() {
this.poolName = null;
this.clusterFraction = 0f;
+ this.queryId = null;
}
double getClusterFraction() {
@@ -183,14 +189,12 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
}
void setIsIrrelevantForWm(String killReason) {
+ if (killReason == null) {
+ throw new AssertionError("Cannot reset the kill reason " + this.killReason);
+ }
this.killReason = killReason;
}
- @Override
- public String toString() {
- return super.toString() + ", poolName: " + poolName + ", clusterFraction: " + clusterFraction;
- }
-
private final class TimeoutRunnable implements Runnable {
@Override
public void run() {
@@ -202,4 +206,19 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
}
}
}
+
+ public void setQueryId(String queryId) {
+ this.queryId = queryId;
+ }
+
+ public String getQueryId() {
+ return this.queryId;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", WM state poolName=" + poolName + ", clusterFraction="
+ + clusterFraction + ", queryId=" + queryId + ", killReason=" + killReason;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index bdbcce5..0fdaf55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.ql.exec.tez.UserPoolMapping.MappingInput;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.KillQuery;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.wm.ExecutionTrigger;
import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider;
@@ -67,35 +70,51 @@ import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-/** Workload management entry point for HS2. */
+/** Workload management entry point for HS2.
+ * Note on how this class operates.
+ * There are tons of things that could be happening in parallel that are a real pain to sync.
+ * Therefore, it uses an actor-ish model where the master thread, in processCurrentEvents method,
+ * processes a bunch of events that have accumulated since the previous iteration, repeatedly
+ * and quickly, doing physical work via async calls or via worker threads.
+ * That way the bulk of the state (pools, etc.) does not require any sync, and we mostly have
+ * a consistent view of the conflicting events when we process things. However, that also means
+ * none of that state can be accessed directly - most changes that touch pool state, or interact
+ * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse.
+ */
public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
private static final char POOL_SEPARATOR = '.';
private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
+ // Various final services, configs, etc.
private final HiveConf conf;
private final TezSessionPool<WmTezSession> tezAmPool;
private final SessionExpirationTracker expirationTracker;
private final RestrictedConfigChecker restrictedConfig;
private final QueryAllocationManager allocationManager;
private final String yarnQueue;
+ private final int amRegistryTimeoutMs;
// Note: it's not clear that we need to track this - unlike PoolManager we don't have non-pool
// sessions, so the pool itself could internally track the sessions it gave out, since
// calling close on an unopened session is probably harmless.
private final IdentityHashMap<TezSessionPoolSession, Boolean> openSessions =
- new IdentityHashMap<>();
- private final int amRegistryTimeoutMs;
-
- // Note: pools can only be modified by the master thread.
- private HashMap<String, PoolState> pools;
- // Used to make sure that waiting getSessions don't block update.
- private UserPoolMapping userPoolMapping;
- private int totalQueryParallelism;
+ new IdentityHashMap<>();
// We index the get requests to make sure there are no ordering artifacts when we requeue.
private final AtomicLong getRequestVersion = new AtomicLong(Long.MIN_VALUE);
- private PerPoolTriggerValidatorRunnable triggerValidatorRunnable;
+ // The below group of fields (pools, etc.) can only be modified by the master thread.
+ private Map<String, PoolState> pools;
+ private int totalQueryParallelism;
+ /**
+ * The queries being killed. This is used to sync between the background kill finishing and the
+ * query finishing and user returning the sessions, which can happen in separate iterations
+ * of the master thread processing, yet need to be aware of each other.
+ */
+ private Map<WmTezSession, KillQueryContext> killQueryInProgress = new IdentityHashMap<>();
+ // Used to make sure that waiting getSessions don't block update.
+ private UserPoolMapping userPoolMapping;
+ // End of master thread state
// Note: we could use RW lock to allow concurrent calls for different sessions, however all
// those calls do is add elements to lists and maps; and we'd need to sync those separately
@@ -107,8 +126,16 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final EventState one = new EventState(), two = new EventState();
private boolean hasChanges = false;
private EventState current = one;
+ private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+ // End sync stuff.
+
+ private PerPoolTriggerValidatorRunnable triggerValidatorRunnable;
private Map<String, SessionTriggerProvider> perPoolProviders = new ConcurrentHashMap<>();
+ private SessionTriggerProvider sessionTriggerProvider;
+ private TriggerActionHandler triggerActionHandler;
+
+ // The master thread and various workers.
/** The master thread the processes the events from EventState. */
@VisibleForTesting
protected final Thread wmThread;
@@ -116,7 +143,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final ExecutorService workPool;
/** Used to schedule timeouts for some async operations. */
private final ScheduledExecutorService timeoutPool;
- private final WmThreadSyncWork syncWork = new WmThreadSyncWork();
+
+ // The initial plan initalization future, to wait for the plan to apply during setup.
private ListenableFuture<Boolean> initRpFuture;
private static final FutureCallback<Object> FATAL_ERROR_CALLBACK = new FutureCallback<Object>() {
@@ -131,7 +159,6 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
};
- // TODO: this is temporary before HiveServerEnvironment is merged.
private static volatile WorkloadManager INSTANCE;
public static WorkloadManager getInstance() {
@@ -182,7 +209,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
startTriggerValidator(triggerValidationIntervalMs);
}
- private int determineQueryParallelism(WMFullResourcePlan plan) {
+ private static int determineQueryParallelism(WMFullResourcePlan plan) {
int result = 0;
for (WMPool pool : plan.getPools()) {
result += pool.getQueryParallelism();
@@ -245,7 +272,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
/** Represent a single iteration of work for the master thread. */
private final static class EventState {
private final Set<WmTezSession> toReturn = Sets.newIdentityHashSet(),
- toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+ toDestroy = Sets.newIdentityHashSet(), updateErrors = Sets.newIdentityHashSet();
+ private final Map<WmTezSession, Boolean> killQueryResults = new IdentityHashMap<>();
private final LinkedList<SessionInitContext> initResults = new LinkedList<>();
private final IdentityHashMap<WmTezSession, SettableFuture<WmTezSession>> toReopen =
new IdentityHashMap<>();
@@ -279,8 +307,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* (mostly opening and closing the sessions).
*/
private final static class WmThreadSyncWork {
- private LinkedList<WmTezSession> toRestartInUse = new LinkedList<>(),
- toDestroyNoRestart = new LinkedList<>();
+ private List<WmTezSession> toRestartInUse = new LinkedList<>(),
+ toDestroyNoRestart = new LinkedList<>();
+ private Map<WmTezSession, KillQueryContext> toKillQuery = new IdentityHashMap<>();
}
private void runWmThread() {
@@ -330,9 +359,43 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private void scheduleWork(WmThreadSyncWork context) {
// Do the work that cannot be done via async calls.
- // 1. Restart pool sessions.
+ // 1. Kill queries.
+ for (KillQueryContext killCtx : context.toKillQuery.values()) {
+ final WmTezSession toKill = killCtx.session;
+ final String reason = killCtx.reason;
+ LOG.info("Killing query for {}", toKill);
+ workPool.submit(() -> {
+ // Note: we get query ID here, rather than in the caller, where it would be more correct
+ // because we know which exact query we intend to kill. This is valid because we
+ // are not expecting query ID to change - we never reuse the session for which a
+ // query is being killed until both the kill, and the user, return it.
+ String queryId = toKill.getQueryId();
+ KillQuery kq = toKill.getKillQuery();
+ if (kq != null && queryId != null) {
+ LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
+ try {
+ kq.killQuery(queryId, reason);
+ addKillQueryResult(toKill, true);
+ LOG.debug("Killed " + queryId);
+ return;
+ } catch (HiveException ex) {
+ LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+ }
+ } else {
+ LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
+ }
+ // We cannot restart in place because the user might receive a failure and return the
+ // session to the master thread without the "irrelevant" flag set. In fact, the query might
+ // have succeeded in the gap and the session might already be returned. Queue restart thru
+ // the master thread.
+ addKillQueryResult(toKill, false);
+ });
+ }
+ context.toKillQuery.clear();
+
+ // 2. Restart pool sessions.
for (final WmTezSession toRestart : context.toRestartInUse) {
- LOG.info("Replacing " + toRestart + " with a new session");
+ LOG.info("Replacing {} with a new session", toRestart);
workPool.submit(() -> {
try {
// Note: sessions in toRestart are always in use, so they cannot expire in parallel.
@@ -343,9 +406,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
});
}
context.toRestartInUse.clear();
- // 2. Destroy the sessions that we don't need anymore.
+
+ // 3. Destroy the sessions that we don't need anymore.
for (final WmTezSession toDestroy : context.toDestroyNoRestart) {
- LOG.info("Closing " + toDestroy + " without restart");
+ LOG.info("Closing {} without restart", toDestroy);
workPool.submit(() -> {
try {
toDestroy.close(false);
@@ -357,6 +421,17 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
context.toDestroyNoRestart.clear();
}
+ /**
+ * This is the main method of the master thread the processes one set of events.
+ * Be mindful of the fact that events can be queued while we are processing events, so
+ * in addition to making sure we keep the current set consistent (e.g. no need to handle
+ * update errors for a session that should already be destroyed), this needs to guard itself
+ * against the future iterations - e.g. what happens if we kill a query due to plan change,
+ * but the DAG finished before the kill happens and the user queues a "return" event? Etc.
+ * DO NOT block for a long time in this method.
+ * @param e Input events.
+ * @param syncWork Output tasks that cannot be called via async methods.
+ */
private void processCurrentEvents(EventState e, WmThreadSyncWork syncWork) throws Exception {
// The order of processing is as follows. We'd reclaim or kill all the sessions that we can
// reclaim from various user actions and errors, then apply the new plan if any,
@@ -374,40 +449,61 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
e.initResults.clear();
- // 1. Handle sessions that are being destroyed by users. Destroy implies return.
+ // 1. Handle kill query results - part 1, just put them in place. We will resolve what
+ // to do with the sessions after we go thru all the concurrent user actions.
+ for (Map.Entry<WmTezSession, Boolean> entry : e.killQueryResults.entrySet()) {
+ WmTezSession killQuerySession = entry.getKey();
+ boolean killResult = entry.getValue();
+ LOG.debug("Processing KillQuery {} for {}",
+ killResult ? "success" : "failure", killQuerySession);
+ // Note: do not cancel any user actions here; user actions actually interact with kills.
+ KillQueryContext killCtx = killQueryInProgress.get(killQuerySession);
+ if (killCtx == null) {
+ LOG.error("Internal error - cannot find the context for killing {}", killQuerySession);
+ continue;
+ }
+ killCtx.handleKillQueryCallback(!killResult);
+ }
+ e.killQueryResults.clear();
+
+ // 2. Handle sessions that are being destroyed by users. Destroy implies return.
for (WmTezSession sessionToDestroy : e.toDestroy) {
if (e.toReturn.remove(sessionToDestroy)) {
LOG.warn("The session was both destroyed and returned by the user; destroying");
}
LOG.info("Destroying {}", sessionToDestroy);
- Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
- e, sessionToDestroy, poolsToRedistribute);
- if (shouldReturn == null || shouldReturn) {
- // Restart if this session is still relevant, even if there's an internal error.
+ RemoveSessionResult rr = handleReturnedInUseSessionOnMasterThread(
+ e, sessionToDestroy, poolsToRedistribute, false);
+ if (rr == RemoveSessionResult.OK || rr == RemoveSessionResult.NOT_FOUND) {
+ // Restart even if there's an internal error.
syncWork.toRestartInUse.add(sessionToDestroy);
}
}
e.toDestroy.clear();
- // 2. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
+ // 3. Now handle actual returns. Sessions may be returned to the pool or may trigger expires.
for (WmTezSession sessionToReturn: e.toReturn) {
LOG.info("Returning {}", sessionToReturn);
- Boolean shouldReturn = handleReturnedInUseSessionOnMasterThread(
- e, sessionToReturn, poolsToRedistribute);
- if (shouldReturn == null) {
- // Restart if there's an internal error.
- syncWork.toRestartInUse.add(sessionToReturn);
- continue;
- }
- if (!shouldReturn) continue;
- boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
- if (!wasReturned) {
- syncWork.toDestroyNoRestart.add(sessionToReturn);
+ RemoveSessionResult rr = handleReturnedInUseSessionOnMasterThread(
+ e, sessionToReturn, poolsToRedistribute, true);
+ switch (rr) {
+ case OK:
+ boolean wasReturned = tezAmPool.returnSessionAsync(sessionToReturn);
+ if (!wasReturned) {
+ syncWork.toDestroyNoRestart.add(sessionToReturn);
+ }
+ break;
+ case NOT_FOUND:
+ syncWork.toRestartInUse.add(sessionToReturn); // Restart if there's an internal error.
+ break;
+ case IGNORE:
+ break;
+ default: throw new AssertionError("Unknown state " + rr);
}
}
e.toReturn.clear();
- // 3. Reopen is essentially just destroy + get a new session for a session in use.
+ // 4. Reopen is essentially just destroy + get a new session for a session in use.
for (Map.Entry<WmTezSession, SettableFuture<WmTezSession>> entry : e.toReopen.entrySet()) {
LOG.info("Reopening {}", entry.getKey());
handeReopenRequestOnMasterThread(
@@ -415,14 +511,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
e.toReopen.clear();
- // 4. All the sessions in use that were not destroyed or returned with a failed update now die.
+ // 5. All the sessions in use that were not destroyed or returned with a failed update now die.
for (WmTezSession sessionWithUpdateError : e.updateErrors) {
LOG.info("Update failed for {}", sessionWithUpdateError);
handleUpdateErrorOnMasterThread(sessionWithUpdateError, e, syncWork, poolsToRedistribute);
}
e.updateErrors.clear();
- // 5. Now apply a resource plan if any. This is expected to be pretty rare.
+ // 6. Now apply a resource plan if any. This is expected to be pretty rare.
boolean hasRequeues = false;
if (e.resourcePlanToApply != null) {
LOG.info("Applying new resource plan");
@@ -432,21 +528,19 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
e.resourcePlanToApply = null;
- // 6. Handle any move session requests. The way move session works right now is
+ // 7. Handle any move session requests. The way move session works right now is
// a) sessions get moved to destination pool if there is capacity in destination pool
// b) if there is no capacity in destination pool, the session gets killed (since we cannot pause a query)
// TODO: in future this the process of killing can be delayed until the point where a session is actually required.
// We could consider delaying the move (when destination capacity is full) until there is claim in src pool.
// May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
// as possible
- if (e.moveSessions != null) {
- for (MoveSession moveSession : e.moveSessions) {
- handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
- }
+ for (MoveSession moveSession : e.moveSessions) {
+ handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
}
e.moveSessions.clear();
- // 7. Handle all the get/reuse requests. We won't actually give out anything here, but merely
+ // 8. Handle all the get/reuse requests. We won't actually give out anything here, but merely
// map all the requests and place them in an appropriate order in pool queues. The only
// exception is the reuse without queue contention; can be granted immediately. If we can't
// reuse the session immediately, we will convert the reuse to a normal get, because we
@@ -458,14 +552,42 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
e.toReuse.clear();
- // 8. If there was a cluster state change, make sure we redistribute all the pools.
+ // 9. Resolve all the kill query requests in flight. Nothing below can affect them.
+ Iterator<KillQueryContext> iter = killQueryInProgress.values().iterator();
+ while (iter.hasNext()) {
+ KillQueryContext ctx = iter.next();
+ KillQueryResult kr = ctx.process();
+ switch (kr) {
+ case IN_PROGRESS: continue; // Either the user or the kill is not done yet.
+ case OK: {
+ iter.remove();
+ LOG.debug("Kill query succeeded; returning to the pool: {}", ctx.session);
+ if (!tezAmPool.returnSessionAsync(ctx.session)) {
+ syncWork.toDestroyNoRestart.add(ctx.session);
+ }
+ break;
+ }
+ case RESTART_REQUIRED: {
+ iter.remove();
+ LOG.debug("Kill query failed; restarting: {}", ctx.session);
+ // Note: we assume here the session, before we resolve killQuery result here, is still
+ // "in use". That is because all the user ops above like return, reopen, etc.
+ // don't actually return/reopen/... when kill query is in progress.
+ syncWork.toRestartInUse.add(ctx.session);
+ break;
+ }
+ default: throw new AssertionError("Unknown state " + kr);
+ }
+ }
+
+ // 10. If there was a cluster state change, make sure we redistribute all the pools.
if (e.hasClusterStateChanged) {
LOG.info("Processing a cluster state change");
poolsToRedistribute.addAll(pools.keySet());
e.hasClusterStateChanged = false;
}
- // 9. Finally, for all the pools that have changes, promote queued queries and rebalance.
+ // 11. Finally, for all the pools that have changes, promote queued queries and rebalance.
for (String poolName : poolsToRedistribute) {
if (LOG.isDebugEnabled()) {
LOG.info("Processing changes for pool " + poolName + ": " + pools.get(poolName));
@@ -473,7 +595,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
}
- // 10. Notify tests and global async ops.
+ // 12. Save state for future iterations.
+ for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
+ if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
+ LOG.error("One query killed several times - internal error {}", killCtx.session);
+ }
+ }
+
+ // 13. Notify tests and global async ops.
if (e.testEvent != null) {
e.testEvent.set(true);
e.testEvent = null;
@@ -491,8 +620,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
LOG.info("Handling move session event: {}", moveSession);
if (validMove(moveSession.srcSession, destPoolName)) {
// remove from src pool
- Boolean removed = checkAndRemoveSessionFromItsPool(moveSession.srcSession, poolsToRedistribute);
- if (removed != null && removed) {
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ moveSession.srcSession, poolsToRedistribute, true);
+ if (rr == RemoveSessionResult.OK) {
// check if there is capacity in dest pool, if so move else kill the session
if (capacityAvailable(destPoolName)) {
// add to destination pool
@@ -505,7 +635,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
} else {
moveSession.srcSession.clearWm();
- moveSession.srcSession.setIsIrrelevantForWm("Destination pool " + destPoolName + " is full. Killing query.");
+ moveSession.srcSession.setIsIrrelevantForWm("Destination pool "
+ + destPoolName + " is full. Killing query.");
syncWork.toRestartInUse.add(moveSession.srcSession);
}
} else {
@@ -575,8 +706,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- private Boolean handleReturnedInUseSessionOnMasterThread(
- EventState e, WmTezSession session, HashSet<String> poolsToRedistribute) {
+ private RemoveSessionResult handleReturnedInUseSessionOnMasterThread(
+ EventState e, WmTezSession session, HashSet<String> poolsToRedistribute, boolean isReturn) {
// This handles the common logic for destroy and return - everything except
// the invalid combination of destroy and return themselves, as well as the actual
// statement that destroys or returns it.
@@ -591,7 +722,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (reuseRequest != null) {
reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
}
- return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
+ return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
}
private void handeReopenRequestOnMasterThread(EventState e, WmTezSession session,
@@ -608,26 +739,32 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// anything. Instead, we will try to give out an existing session from the pool, and restart
// the problematic one in background.
String poolName = session.getPoolName();
- Boolean isRemoved = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute);
- // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
- // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
- if (isRemoved == null) {
- future.setException(new RuntimeException("Reopen failed due to an internal error"));
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, false);
+ switch (rr) {
+ case OK:
+ // If pool didn't exist, checkAndRemoveSessionFromItsPool wouldn't have returned OK.
+ PoolState pool = pools.get(poolName);
+ SessionInitContext sw = new SessionInitContext(future, poolName, session.getQueryId());
+ // We have just removed the session from the same pool, so don't check concurrency here.
+ pool.initializingSessions.add(sw);
+ ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+ Futures.addCallback(getFuture, sw);
syncWork.toRestartInUse.add(session);
return;
- } else if (!isRemoved) {
+ case IGNORE:
+ // Reopen implies the use of the reopened session for the same query that we gave it out
+ // for; so, as we would have failed an active query, fail the user before it's started.
future.setException(new RuntimeException("WM killed this session during reopen: "
- + session.getReasonForKill()));
- return; // No longer relevant for WM - bail.
+ + session.getReasonForKill()));
+ return; // No longer relevant for WM.
+ case NOT_FOUND:
+ // If we fail to remove, it's probably an internal error. We'd try to handle it the same way
+ // as above - by restarting the session. We'd fail the caller to avoid exceeding parallelism.
+ future.setException(new RuntimeException("Reopen failed due to an internal error"));
+ syncWork.toRestartInUse.add(session);
+ return;
+ default: throw new AssertionError("Unknown state " + rr);
}
- // If pool didn't exist, removeSessionFromItsPool would have returned null.
- PoolState pool = pools.get(poolName);
- SessionInitContext sw = new SessionInitContext(future, poolName);
- // We have just removed the session from the same pool, so don't check concurrency here.
- pool.initializingSessions.add(sw);
- ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
- Futures.addCallback(getFuture, sw);
- syncWork.toRestartInUse.add(session);
}
private void handleUpdateErrorOnMasterThread(WmTezSession sessionWithUpdateError,
@@ -640,17 +777,23 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// TODO: we should communicate this to the user more explicitly (use kill query API, or
// add an option for bg kill checking to TezTask/monitor?
// We are assuming the update-error AM is bad and just try to kill it.
- Boolean isRemoved = checkAndRemoveSessionFromItsPool(sessionWithUpdateError, poolsToRedistribute);
- if (isRemoved != null && !isRemoved) {
- // An update error for some session that was actually already killed by us.
- return;
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ sessionWithUpdateError, poolsToRedistribute, null);
+ switch (rr) {
+ case OK:
+ case NOT_FOUND:
+ // Regardless whether it was removed successfully or after failing to remove, restart it.
+ // Since we just restart this from under the user, mark it so we handle it properly when
+ // the user tries to actually use this session and fails, proceeding to return/destroy it.
+ sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
+ // We assume AM might be bad so we will not try to kill the query here; just scrap the AM.
+ syncWork.toRestartInUse.add(sessionWithUpdateError);
+ break;
+ case IGNORE:
+ return; // An update error for some session that was actually already killed by us.
+ default:
+ throw new AssertionError("Unknown state " + rr);
}
- // Regardless whether it was removed successfully or after failing to remove, restart it.
- // Since we just restart this from under the user, mark it so we handle it properly when
- // the user tries to actually use this session and fails, proceeding to return/destroy it.
- // TODO: propagate this error to TezJobMonitor somehow, after we add the use of KillQuery.
- sessionWithUpdateError.setIsIrrelevantForWm("Failed to update resource allocation");
- syncWork.toRestartInUse.add(sessionWithUpdateError);
}
private void applyNewResourcePlanOnMasterThread(
@@ -661,7 +804,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// that fractions or query parallelism add up, etc.
this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(),
e.resourcePlanToApply.getPlan().getDefaultPoolPath());
- HashMap<String, PoolState> oldPools = pools;
+ Map<String, PoolState> oldPools = pools;
pools = new HashMap<>();
// For simplicity, to always have parents while storing pools in a flat structure, we'll
@@ -692,7 +835,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
state = new PoolState(fullName, qp, fraction);
} else {
// This will also take care of the queries if query parallelism changed.
- state.update(qp, fraction, syncWork.toRestartInUse, e);
+ state.update(qp, fraction, syncWork.toKillQuery, e);
poolsToRedistribute.add(fullName);
}
state.setTriggers(new LinkedList<Trigger>());
@@ -719,7 +862,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (oldPools != null && !oldPools.isEmpty()) {
// Looks like some pools were removed; kill running queries, re-queue the queued ones.
for (PoolState oldPool : oldPools.values()) {
- oldPool.destroy(syncWork.toRestartInUse, e.getRequests, e.toReuse);
+ oldPool.destroy(syncWork.toKillQuery, e.getRequests, e.toReuse);
}
}
@@ -728,17 +871,34 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
this.totalQueryParallelism = totalQueryParallelism;
if (deltaSessions == 0) return; // Nothing to do.
if (deltaSessions < 0) {
- // First, see if we have unused sessions that we were planning to restart; get rid of those.
- int toTransfer = Math.min(-deltaSessions, syncWork.toRestartInUse.size());
- for (int i = 0; i < toTransfer; ++i) {
- syncWork.toDestroyNoRestart.add(syncWork.toRestartInUse.pollFirst());
- }
- deltaSessions += toTransfer;
+ // First, see if we have sessions that we were planning to restart/kill; get rid of those.
+ deltaSessions = transferSessionsToDestroy(
+ syncWork.toKillQuery.keySet(), syncWork.toDestroyNoRestart, deltaSessions);
+ deltaSessions = transferSessionsToDestroy(
+ syncWork.toRestartInUse, syncWork.toDestroyNoRestart, deltaSessions);
}
if (deltaSessions != 0) {
- failOnFutureFailure(tezAmPool.resizeAsync(
- deltaSessions, syncWork.toDestroyNoRestart));
+ failOnFutureFailure(tezAmPool.resizeAsync(deltaSessions, syncWork.toDestroyNoRestart));
+ }
+ }
+
+ private static int transferSessionsToDestroy(Collection<WmTezSession> source,
+ List<WmTezSession> toDestroy, int deltaSessions) {
+ // We were going to kill some queries and reuse the sessions, or maybe restart and put the new
+ // ones back into the AM pool. However, the AM pool has shrunk, so we will close them instead.
+ if (deltaSessions >= 0) return deltaSessions;
+ int toTransfer = Math.min(-deltaSessions, source.size());
+ Iterator<WmTezSession> iter = source.iterator();
+ for (int i = 0; i < toTransfer; ++i) {
+ WmTezSession session = iter.next();
+ LOG.debug("Will destroy {} instead of restarting", session);
+ if (!session.isIrrelevantForWm()) {
+ session.setIsIrrelevantForWm("Killed due to workload management plan change");
+ }
+ toDestroy.add(session);
+ iter.remove();
}
+ return deltaSessions + toTransfer;
}
@SuppressWarnings("unchecked")
@@ -768,9 +928,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Kills that could have removed it must have cleared sessionToReuse.
String oldPoolName = req.sessionToReuse.getPoolName();
oldPool = pools.get(oldPoolName);
- Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
- if (isRemoved == null || !isRemoved) {
- // This is probably an internal error... abandon the reuse attempt.
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ req.sessionToReuse, poolsToRedistribute, true);
+ if (rr != RemoveSessionResult.OK) {
+ // Abandon the reuse attempt.
returnSessionOnFailedReuse(req, syncWork, null);
req.sessionToReuse = null;
} else if (pool.getTotalActiveSessions() + pool.queue.size() >= pool.queryParallelism) {
@@ -786,6 +947,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// If we can immediately reuse a session, there's nothing to wait for - just return.
req.sessionToReuse.setPoolName(poolName);
req.sessionToReuse.setQueueName(yarnQueue);
+ req.sessionToReuse.setQueryId(req.queryId);
pool.sessions.add(req.sessionToReuse);
if (pool != oldPool) {
poolsToRedistribute.add(poolName);
@@ -822,7 +984,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// Note that in theory, we are guaranteed to have a session waiting for us here, but
// the expiration, failures, etc. may cause one to be missing pending restart.
// See SessionInitContext javadoc.
- SessionInitContext sw = new SessionInitContext(queueReq.future, poolName);
+ SessionInitContext sw = new SessionInitContext(queueReq.future, poolName, queueReq.queryId);
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
Futures.addCallback(getFuture, sw);
// It is possible that all the async methods returned on the same thread because the
@@ -847,32 +1009,54 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
private void returnSessionOnFailedReuse(
- GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
- if (req.sessionToReuse == null) return;
+ GetRequest req, WmThreadSyncWork syncWork, HashSet<String> poolsToRedistribute) {
+ WmTezSession session = req.sessionToReuse;
+ if (session == null) return;
+ req.sessionToReuse = null;
if (poolsToRedistribute != null) {
- Boolean isRemoved = checkAndRemoveSessionFromItsPool(req.sessionToReuse, poolsToRedistribute);
- // The session cannot have been killed; this happens after all the kills in the current
- // iteration, so we would have cleared sessionToReuse when killing this.
- assert isRemoved == null || isRemoved;
+ RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
+ session, poolsToRedistribute, true);
+ // The session cannot have been killed just now; this happens after all the kills in
+ // the current iteration, so we would have cleared sessionToReuse when killing this.
+ boolean isOk = (rr == RemoveSessionResult.OK);
+ assert isOk || rr == RemoveSessionResult.IGNORE;
+ if (!isOk) return;
}
- if (!tezAmPool.returnSessionAsync(req.sessionToReuse)) {
- syncWork.toDestroyNoRestart.add(req.sessionToReuse);
+ if (!tezAmPool.returnSessionAsync(session)) {
+ syncWork.toDestroyNoRestart.add(session);
}
- req.sessionToReuse = null;
+ }
+
+ /** The result of trying to remove a presumably-active session from a pool on a user request. */
+ private static enum RemoveSessionResult {
+ OK, // Normal case - an active session was removed from the pool.
+ IGNORE, // Session was restarted out of bounds, any user-side handling should be ignored.
+ // Or, session is being killed, need to coordinate between that and the user.
+ // These two cases don't need to be distinguished for now.
+ NOT_FOUND // The session is active but not found in the pool - internal error.
}
/**
* Checks if the session is still relevant for WM and if yes, removes it from its thread.
+ * @param isSessionOk Whether the user thinks the session being returned in some way is ok;
+ * true means it is (return, reuse); false mean it isn't (reopen, destroy);
+ * null means this is not a user call.
* @return true if the session was removed; false if the session was already processed by WM
* thread (so we are dealing with an outdated request); null if the session should be
* in WM but wasn't found in the requisite pool (internal error?).
*/
- private Boolean checkAndRemoveSessionFromItsPool(
- WmTezSession session, HashSet<String> poolsToRedistribute) {
+ private RemoveSessionResult checkAndRemoveSessionFromItsPool(
+ WmTezSession session, HashSet<String> poolsToRedistribute, Boolean isSessionOk) {
// It is possible for some request to be queued after a main thread has decided to kill this
// session; on the next iteration, we'd be processing that request with an irrelevant session.
if (session.isIrrelevantForWm()) {
- return false;
+ return RemoveSessionResult.IGNORE;
+ }
+ if (killQueryInProgress.containsKey(session)) {
+ if (isSessionOk != null) {
+ killQueryInProgress.get(session).handleUserCallback(!isSessionOk);
+ }
+ return RemoveSessionResult.IGNORE;
}
// If we did not kill this session we expect everything to be present.
String poolName = session.getPoolName();
@@ -881,11 +1065,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
PoolState pool = pools.get(poolName);
session.clearWm();
if (pool != null && pool.sessions.remove(session)) {
- return true;
+ return RemoveSessionResult.OK;
}
}
LOG.error("Session was not in the pool (internal error) " + poolName + ": " + session);
- return null;
+ return RemoveSessionResult.NOT_FOUND;
}
private Boolean checkAndAddSessionToAnotherPool(
@@ -955,11 +1139,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private final MappingInput mappingInput;
private final SettableFuture<WmTezSession> future;
private WmTezSession sessionToReuse;
+ private final String queryId;
- private GetRequest(MappingInput mappingInput, SettableFuture<WmTezSession> future,
- WmTezSession sessionToReuse, long order) {
+ private GetRequest(MappingInput mappingInput, String queryId,
+ SettableFuture<WmTezSession> future, WmTezSession sessionToReuse, long order) {
assert mappingInput != null;
this.mappingInput = mappingInput;
+ this.queryId = queryId;
this.future = future;
this.sessionToReuse = sessionToReuse;
this.order = order;
@@ -975,10 +1161,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
TezSessionState session, MappingInput input, HiveConf conf) throws Exception {
// Note: not actually used for pool sessions; verify some things like doAs are not set.
validateConfig(conf);
+ String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
SettableFuture<WmTezSession> future = SettableFuture.create();
WmTezSession wmSession = checkSessionForReuse(session);
GetRequest req = new GetRequest(
- input, future, wmSession, getRequestVersion.incrementAndGet());
+ input, queryId, future, wmSession, getRequestVersion.incrementAndGet());
currentLock.lock();
try {
current.getRequests.add(req);
@@ -1049,6 +1236,17 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ private void addKillQueryResult(WmTezSession toKill, boolean success) {
+ currentLock.lock();
+ try {
+ current.killQueryResults.put(toKill, success);
+ notifyWmThreadUnderLock();
+ } finally {
+ currentLock.unlock();
+ }
+ }
+
+
@VisibleForTesting
/**
* Adds a test event that's processed at the end of WM iteration.
@@ -1271,7 +1469,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
public void update(int queryParallelism, double fraction,
- List<WmTezSession> toKill, EventState e) {
+ Map<WmTezSession, KillQueryContext> toKill, EventState e) {
this.finalFraction = this.finalFractionRemaining = fraction;
this.queryParallelism = queryParallelism;
// TODO: two possible improvements
@@ -1290,8 +1488,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- public void destroy(List<WmTezSession> toKill, LinkedList<GetRequest> globalQueue,
- IdentityHashMap<WmTezSession, GetRequest> toReuse) {
+ public void destroy(Map<WmTezSession, KillQueryContext> toKill,
+ LinkedList<GetRequest> globalQueue, IdentityHashMap<WmTezSession, GetRequest> toReuse) {
extractAllSessionsToKill("The query pool was removed by administrator", toReuse, toKill);
// All the pending get requests should just be requeued elsewhere.
// Note that we never queue session reuse so sessionToReuse would be null.
@@ -1323,10 +1521,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
private void extractAllSessionsToKill(String killReason,
- IdentityHashMap<WmTezSession, GetRequest> toReuse, List<WmTezSession> toKill) {
+ IdentityHashMap<WmTezSession, GetRequest> toReuse,
+ Map<WmTezSession, KillQueryContext> toKill) {
for (WmTezSession sessionToKill : sessions) {
- resetRemovedSession(sessionToKill, killReason, toReuse);
- toKill.add(sessionToKill);
+ resetRemovedSession(sessionToKill, toReuse);
+ toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
}
sessions.clear();
for (SessionInitContext initCtx : initializingSessions) {
@@ -1336,16 +1535,14 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (sessionToKill == null) {
continue; // Async op in progress; the callback will take care of this.
}
- resetRemovedSession(sessionToKill, killReason, toReuse);
- toKill.add(sessionToKill);
+ resetRemovedSession(sessionToKill, toReuse);
+ toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
}
initializingSessions.clear();
}
- private void resetRemovedSession(WmTezSession sessionToKill, String killReason,
- IdentityHashMap<WmTezSession, GetRequest> toReuse) {
- assert killReason != null;
- sessionToKill.setIsIrrelevantForWm(killReason);
+ private void resetRemovedSession(WmTezSession sessionToKill,
+ IdentityHashMap<WmTezSession, GetRequest> toReuse) {
sessionToKill.clearWm();
GetRequest req = toReuse.remove(sessionToKill);
if (req != null) {
@@ -1375,7 +1572,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* for async session initialization, as well as parallel cancellation.
*/
private final class SessionInitContext implements FutureCallback<WmTezSession> {
- private final String poolName;
+ private final String poolName, queryId;
private final ReentrantLock lock = new ReentrantLock();
private WmTezSession session;
@@ -1383,10 +1580,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private SessionInitState state;
private String cancelReason;
- public SessionInitContext(SettableFuture<WmTezSession> future, String poolName) {
+ public SessionInitContext(SettableFuture<WmTezSession> future, String poolName, String queryId) {
this.state = SessionInitState.GETTING;
this.future = future;
this.poolName = poolName;
+ this.queryId = queryId;
}
@Override
@@ -1402,6 +1600,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
assert this.state == SessionInitState.GETTING;
session.setPoolName(poolName);
session.setQueueName(yarnQueue);
+ session.setQueryId(queryId);
this.session = session;
this.state = SessionInitState.WAITING_FOR_REGISTRY;
break;
@@ -1448,6 +1647,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
"The query was killed by workload management: " + cancelReason));
session.setPoolName(null);
session.setClusterFraction(0f);
+ session.setQueryId(null);
tezAmPool.returnSession(session);
break;
}
@@ -1541,6 +1741,74 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
return userPoolMapping.mapSessionToPoolName(input) != null;
}
+ private static enum KillQueryResult {
+ OK,
+ RESTART_REQUIRED,
+ IN_PROGRESS
+ }
+
+ /**
+ * When we kill a query without killing a session, we need two things to come back before reuse.
+ * First of all, kill query itself should come back, and second the user should handle it
+ * and let go of the session (or, the query could finish and it could give the session back
+ * even before we try to kill the query). We also need to handle cases where the user doesn't
+ * like the session even before we kill it, or the kill fails and the user is happily computing
+ * away. This class is to collect and make sense of the state around all this.
+ */
+ private static final class KillQueryContext {
+ private final String reason;
+ private final WmTezSession session;
+ // Note: all the fields are only modified by master thread.
+ private boolean isUserDone = false, isKillDone = false,
+ hasKillFailed = false, hasUserFailed = false;
+
+ public KillQueryContext(WmTezSession session, String reason) {
+ this.session = session;
+ this.reason = reason;
+ }
+
+ private void handleKillQueryCallback(boolean hasFailed) {
+ isKillDone = true;
+ hasKillFailed = hasFailed;
+ }
+
+ private void handleUserCallback(boolean hasFailed) {
+ if (isUserDone) {
+ LOG.warn("Duplicate user call for a session being killed; ignoring");
+ return;
+ }
+ isUserDone = true;
+ hasUserFailed = hasFailed;
+ }
+
+ private KillQueryResult process() {
+ if (!isUserDone && hasKillFailed) {
+ // The user has not returned and the kill has failed.
+ // We are going to brute force kill the AM; whatever user does is now irrelevant.
+ session.setIsIrrelevantForWm(reason);
+ return KillQueryResult.RESTART_REQUIRED;
+ }
+ if (!isUserDone || !isKillDone) return KillQueryResult.IN_PROGRESS; // Someone is not done.
+ // Both user and the kill have returned.
+ if (hasUserFailed && hasKillFailed) {
+ // If the kill failed and the user also thinks the session is invalid, restart it.
+ session.setIsIrrelevantForWm(reason);
+ return KillQueryResult.RESTART_REQUIRED;
+ }
+ // Otherwise, we can reuse the session. Either the kill has failed but the user managed to
+ // return early (in fact, can it fail because the query has completed earlier?), or the user
+ // has failed because the query was killed from under it.
+ return KillQueryResult.OK;
+ }
+
+ @Override
+ public String toString() {
+ return "KillQueryContext [isUserDone=" + isUserDone + ", isKillDone=" + isKillDone
+ + ", hasKillFailed=" + hasKillFailed + ", hasUserFailed=" + hasUserFailed
+ + ", session=" + session + ", reason=" + reason + "]";
+ }
+ }
+
@VisibleForTesting
TezSessionPool<WmTezSession> getTezAmPool() {
return tezAmPool;
http://git-wip-us.apache.org/repos/asf/hive/blob/25f3c3f9/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index a73a24a..e220837 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -506,6 +506,8 @@ public class TestWorkloadManager {
mapping("C", "C"), mapping("D", "D")));
final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
+ TezSessionPool<WmTezSession> tezAmPool = wm.getTezAmPool();
+ assertEquals(6, tezAmPool.getCurrentSize());
// A: 1/1 running, 1 queued; B: 2/2 running, C: 1/2 running, D: 1/1 running, 1 queued.
// Total: 5/6 running.
@@ -525,6 +527,7 @@ public class TestWorkloadManager {
checkError(error);
assertEquals(0.3f, sessionC1.getClusterFraction(), EPSILON);
assertEquals(0.3f, sessionD1.getClusterFraction(), EPSILON);
+ assertEquals(1, tezAmPool.getCurrentSize());
// Change the resource plan - resize B and C down, D up, and remove A remapping users to B.
// Everything will be killed in A and B, C won't change, D will start one more query from
@@ -549,14 +552,16 @@ public class TestWorkloadManager {
assertEquals(0.3f, sessionA2.get().getClusterFraction(), EPSILON);
assertEquals(0.2f, sessionC1.getClusterFraction(), EPSILON);
assertEquals(0.25f, sessionD1.getClusterFraction(), EPSILON);
-
assertKilledByWm(sessionA1);
assertKilledByWm(sessionB1);
assertKilledByWm(sessionB2);
+ assertEquals(0, tezAmPool.getCurrentSize());
// Wait for another iteration to make sure event gets processed for D2 to receive allocation.
sessionA2.get().returnToSessionManager();
assertEquals(0.25f, sessionD2.get().getClusterFraction(), EPSILON);
+ // Return itself should be a no-op - the pool went from 6 to 4 with 1 session in the pool.
+
sessionD2.get().returnToSessionManager();
sessionC1.returnToSessionManager();
sessionD1.returnToSessionManager();
@@ -564,8 +569,8 @@ public class TestWorkloadManager {
// Try to "return" stuff that was killed from "under" us. Should be a no-op.
sessionA1.returnToSessionManager();
sessionB1.returnToSessionManager();
- sessionB2.returnToSessionManager();
- assertEquals(4, wm.getTezAmPool().getCurrentSize());
+ sessionB2.returnToSessionManager();
+ assertEquals(4, tezAmPool.getCurrentSize());
}
@Test(timeout=10000)