You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/29 20:21:05 UTC
[2/2] hive git commit: HIVE-18076 : killquery doesn't actually work
for non-trigger WM kills (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-18076 : killquery doesn't actually work for non-trigger WM kills (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6760b01
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6760b01
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6760b01
Branch: refs/heads/master
Commit: b6760b017cf726a82d727d2970cdcfc0e7eebcb2
Parents: 3500196
Author: sergey <se...@apache.org>
Authored: Wed Nov 29 12:17:01 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Nov 29 12:17:01 2017 -0800
----------------------------------------------------------------------
.../TestLlapTaskSchedulerService.java | 1 -
.../hadoop/hive/ql/exec/tez/TezSessionPool.java | 10 ++-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 ++-
.../hadoop/hive/ql/exec/tez/WmTezSession.java | 1 -
.../hive/ql/exec/tez/WorkloadManager.java | 77 +++++++++++---------
5 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 5460248..90b31e4 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -429,7 +429,6 @@ public class TestLlapTaskSchedulerService {
TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed());
- // TODO# ts.notifyStarted(task);
tsWrapper.ts.updateGuaranteedCount(1);
tsWrapper.ts.waitForMessagesSent(1);
TaskInfo tiHigher = ti1.isGuaranteed() ? ti1 : ti2, tiLower = (tiHigher == ti1) ? ti2 : ti1;
http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 4d2cf88..3bcf657 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -107,6 +107,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
int threadCount = Math.min(initialSize,
HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
Preconditions.checkArgument(threadCount > 0);
+ this.parentSessionState = SessionState.get();
if (threadCount == 1) {
for (int i = 0; i < initialSize; ++i) {
SessionType session = sessionObjFactory.create(null);
@@ -115,7 +116,6 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
}
} else {
final AtomicInteger remaining = new AtomicInteger(initialSize);
- this.parentSessionState = SessionState.get();
@SuppressWarnings("unchecked")
FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
for (int i = threadTasks.length - 1; i >= 0; --i) {
@@ -272,6 +272,14 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
// The caller probably created the new session with the old config, but update the
// registry again just in case. TODO: maybe we should enforce that.
configureAmRegistry(newSession);
+ if (SessionState.get() == null && parentSessionState != null) {
+ // Tez session relies on a threadlocal for open... If we are on some non-session thread,
+ // just use the same SessionState we used for the initial sessions.
+ // Technically, given that all pool sessions are initially based on this state, shoudln't
+ // we also set this at all times and not rely on an external session stuff? We should
+ // probably just get rid of the thread local usage in TezSessionState.
+ SessionState.setCurrentSessionState(parentSessionState);
+ }
newSession.open(additionalFiles, scratchDir);
if (!putSessionBack(newSession, false)) {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 8087b01..af77f30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -171,9 +171,10 @@ public class TezTask extends Task<TezWork> {
TriggerContext triggerContext = ctx.getTriggerContext();
triggerContext.setDesiredCounters(desiredCounters);
- session.setTriggerContext(triggerContext);
- LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, triggerContext.getQueryId());
+ LOG.info("Subscribed to counters: {} for queryId: {}",
+ desiredCounters, triggerContext.getQueryId());
ss.setTezSession(session);
+ session.setTriggerContext(triggerContext);
try {
// jobConf will hold all the configuration for hadoop, tez, and hive
JobConf jobConf = utils.createConfiguration(conf);
@@ -190,8 +191,7 @@ public class TezTask extends Task<TezWork> {
// This is used to compare global and vertex resources. Global resources are originally
// derived from session conf via localizeTempFilesFromConf. So, use that here.
- Configuration sessionConf =
- (session != null && session.getConf() != null) ? session.getConf() : conf;
+ Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf;
Map<String,LocalResource> inputOutputLocalResources =
getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
@@ -584,7 +584,10 @@ public class TezTask extends Task<TezWork> {
try {
console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
+ Arrays.toString(e.getStackTrace()) + " retrying...");
+ // TODO: this is temporary, need to refactor how reopen is invoked.
+ TriggerContext oldCtx = sessionState.getTriggerContext();
sessionState = sessionState.reopen(conf, inputOutputJars);
+ sessionState.setTriggerContext(oldCtx);
dagClient = sessionState.getSession().submitDAG(dag);
} catch (Exception retryException) {
// we failed to submit after retrying. Destroy session and bail.
http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 96d70c9..d61c531 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -143,7 +143,6 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
void clearWm() {
this.poolName = null;
this.clusterFraction = 0f;
- this.queryId = null;
}
double getClusterFraction() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index a8360bd..388a4f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -1,5 +1,4 @@
/**
-/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -396,18 +395,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// query is being killed until both the kill, and the user, return it.
String queryId = toKill.getQueryId();
KillQuery kq = toKill.getKillQuery();
- if (kq != null && queryId != null) {
- LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
- try {
- kq.killQuery(queryId, reason);
- addKillQueryResult(toKill, true);
- LOG.debug("Killed " + queryId);
- return;
- } catch (HiveException ex) {
- LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+ try {
+ if (kq != null && queryId != null) {
+ LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
+ try {
+ kq.killQuery(queryId, reason);
+ addKillQueryResult(toKill, true);
+ LOG.debug("Killed " + queryId);
+ return;
+ } catch (HiveException ex) {
+ LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+ }
+ } else {
+ LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
}
- } else {
- LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
+ } finally {
+ toKill.setQueryId(null);
}
// We cannot restart in place because the user might receive a failure and return the
// session to the master thread without the "irrelevant" flag set. In fact, the query might
@@ -421,12 +424,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// 2. Restart pool sessions.
for (final WmTezSession toRestart : context.toRestartInUse) {
LOG.info("Replacing {} with a new session", toRestart);
+ toRestart.setQueryId(null);
workPool.submit(() -> {
try {
// Note: sessions in toRestart are always in use, so they cannot expire in parallel.
tezAmPool.replaceSession(toRestart, false, null);
} catch (Exception ex) {
- LOG.error("Failed to restart an old session; ignoring " + ex.getMessage());
+ LOG.error("Failed to restart an old session; ignoring", ex);
}
});
}
@@ -564,7 +568,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
// as possible
for (MoveSession moveSession : e.moveSessions) {
- handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
+ handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse);
}
e.moveSessions.clear();
@@ -676,9 +680,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
- private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
- final WmThreadSyncWork syncWork,
- final HashSet<String> poolsToRedistribute) {
+ private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork,
+ Set<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse) {
String destPoolName = moveSession.destPool;
LOG.info("Handling move session event: {}", moveSession);
if (validMove(moveSession.srcSession, destPoolName)) {
@@ -689,7 +692,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// check if there is capacity in dest pool, if so move else kill the session
if (capacityAvailable(destPoolName)) {
// add to destination pool
- Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute);
+ Boolean added = checkAndAddSessionToAnotherPool(
+ moveSession.srcSession, destPoolName, poolsToRedistribute);
if (added != null && added) {
moveSession.future.set(true);
return;
@@ -697,10 +701,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
}
} else {
- moveSession.srcSession.clearWm();
- moveSession.srcSession.setIsIrrelevantForWm("Destination pool "
- + destPoolName + " is full. Killing query.");
- syncWork.toRestartInUse.add(moveSession.srcSession);
+ WmTezSession session = moveSession.srcSession;
+ resetRemovedSessionToKill(session, toReuse);
+ syncWork.toKillQuery.put(session, new KillQueryContext(session, "Destination pool "
+ + destPoolName + " is full. Killing query."));
}
} else {
LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -785,6 +789,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (reuseRequest != null) {
reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
}
+ session.setQueryId(null);
return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
}
@@ -1086,6 +1091,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
WmTezSession session = req.sessionToReuse;
if (session == null) return;
req.sessionToReuse = null;
+ session.setQueryId(null);
if (poolsToRedistribute != null) {
RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
session, poolsToRedistribute, true);
@@ -1119,7 +1125,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* in WM but wasn't found in the requisite pool (internal error?).
*/
private RemoveSessionResult checkAndRemoveSessionFromItsPool(
- WmTezSession session, HashSet<String> poolsToRedistribute, Boolean isSessionOk) {
+ WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk) {
// It is possible for some request to be queued after a main thread has decided to kill this
// session; on the next iteration, we'd be processing that request with an irrelevant session.
if (session.isIrrelevantForWm()) {
@@ -1146,7 +1152,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
private Boolean checkAndAddSessionToAnotherPool(
- WmTezSession session, String destPoolName, HashSet<String> poolsToRedistribute) {
+ WmTezSession session, String destPoolName, Set<String> poolsToRedistribute) {
if (session.isIrrelevantForWm()) {
// This is called only during move session handling, removing session already checks this.
// So this is not expected as remove failing will not even invoke this method
@@ -1624,7 +1630,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
IdentityHashMap<WmTezSession, GetRequest> toReuse,
Map<WmTezSession, KillQueryContext> toKill) {
for (WmTezSession sessionToKill : sessions) {
- resetRemovedSession(sessionToKill, toReuse);
+ resetRemovedSessionToKill(sessionToKill, toReuse);
toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
}
sessions.clear();
@@ -1635,21 +1641,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
if (sessionToKill == null) {
continue; // Async op in progress; the callback will take care of this.
}
- resetRemovedSession(sessionToKill, toReuse);
+ resetRemovedSessionToKill(sessionToKill, toReuse);
toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
}
initializingSessions.clear();
}
- private void resetRemovedSession(WmTezSession sessionToKill,
- IdentityHashMap<WmTezSession, GetRequest> toReuse) {
- sessionToKill.clearWm();
- GetRequest req = toReuse.remove(sessionToKill);
- if (req != null) {
- req.sessionToReuse = null;
- }
- }
-
public void setTriggers(final LinkedList<Trigger> triggers) {
this.triggers = triggers;
}
@@ -1786,6 +1783,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
if (session != null) {
session.clearWm();
+ session.setQueryId(null);
// We can just restart the session if we have received one.
try {
tezAmPool.replaceSession(session, false, null);
@@ -1914,6 +1912,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
}
+ private static void resetRemovedSessionToKill(
+ WmTezSession sessionToKill, Map<WmTezSession, GetRequest> toReuse) {
+ sessionToKill.clearWm();
+ GetRequest req = toReuse.remove(sessionToKill);
+ if (req != null) {
+ req.sessionToReuse = null;
+ }
+ }
+
@VisibleForTesting
TezSessionPool<WmTezSession> getTezAmPool() {
return tezAmPool;