You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/21 02:57:20 UTC
hive git commit: HIVE-18078 : WM getSession needs some retry logic
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 3407e723a -> 4e43ec7c4
HIVE-18078 : WM getSession needs some retry logic (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4e43ec7c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4e43ec7c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4e43ec7c
Branch: refs/heads/master
Commit: 4e43ec7c4c623537bf46b74ba5c01f314db72162
Parents: 3407e72
Author: sergey <se...@apache.org>
Authored: Wed Dec 20 18:51:51 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Dec 20 18:51:51 2017 -0800
----------------------------------------------------------------------
.../hive/ql/exec/tez/WorkloadManager.java | 69 +++++++++++++-------
.../hive/ql/exec/tez/TestWorkloadManager.java | 27 +++++++-
2 files changed, 71 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4e43ec7c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index cda2bba..387d078 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -876,8 +876,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
session.getWmContext(), session.extractHiveResources());
// We have just removed the session from the same pool, so don't check concurrency here.
pool.initializingSessions.add(sw);
- ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
- Futures.addCallback(getFuture, sw);
+ sw.start();
syncWork.toRestartInUse.add(session);
return;
case IGNORE:
@@ -1143,8 +1142,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
// See SessionInitContext javadoc.
SessionInitContext sw = new SessionInitContext(
queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null);
- ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
- Futures.addCallback(getFuture, sw);
+ sw.start();
// It is possible that all the async methods returned on the same thread because the
// session with registry data and stuff was available in the pool.
// If this happens, we'll take the session out here and "cancel" the init so we skip
@@ -1778,6 +1776,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
* for async session initialization, as well as parallel cancellation.
*/
private final class SessionInitContext implements FutureCallback<WmTezSession> {
+ private final static int MAX_ATTEMPT_NUMBER = 1; // Retry once.
+
private final String poolName, queryId;
private final ReentrantLock lock = new ReentrantLock();
@@ -1788,6 +1788,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
private HiveResources prelocalizedResources;
private Path pathToDelete;
private WmContext wmContext;
+ private int attemptNumber = 0;
public SessionInitContext(SettableFuture<WmTezSession> future,
String poolName, String queryId, WmContext wmContext,
@@ -1800,6 +1801,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
this.wmContext = wmContext;
}
+ public void start() throws Exception {
+ ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+ Futures.addCallback(getFuture, this);
+ }
+
@Override
public void onSuccess(WmTezSession session) {
SessionInitState oldState;
@@ -1856,16 +1862,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
}
case WAITING_FOR_REGISTRY: {
// Notify the master thread and the user.
- future.set(session);
notifyInitializationCompleted(this);
+ future.set(session);
break;
}
case CANCELED: {
// Return session to the pool; we can do it directly here.
future.setException(new HiveException(
"The query was killed by workload management: " + cancelReason));
- session.setPoolName(null);
- session.setClusterFraction(0f);
+ session.clearWm();
session.setQueryId(null);
session.setWmContext(null);
tezAmPool.returnSession(session);
@@ -1883,36 +1888,56 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
public void onFailure(Throwable t) {
SettableFuture<WmTezSession> future;
WmTezSession session;
- boolean wasCanceled = false;
+ boolean wasCanceled = false, doRetry = false;
lock.lock();
try {
wasCanceled = (state == SessionInitState.CANCELED);
session = this.session;
- future = this.future;
- this.future = null;
this.session = null;
- if (!wasCanceled) {
- this.state = SessionInitState.DONE;
+ doRetry = !wasCanceled && (attemptNumber < MAX_ATTEMPT_NUMBER);
+ if (doRetry) {
+ ++attemptNumber;
+ this.state = SessionInitState.GETTING;
+ future = null;
+ } else {
+ future = this.future;
+ this.future = null;
+ if (!wasCanceled) {
+ this.state = SessionInitState.DONE;
+ }
}
} finally {
lock.unlock();
}
- future.setException(t);
+ if (doRetry) {
+ try {
+ start();
+ return;
+ } catch (Exception e) {
+ LOG.error("Failed to retry; propagating original error. The new error is ", e);
+ } finally {
+ discardSessionOnFailure(session);
+ }
+ }
if (!wasCanceled) {
if (LOG.isDebugEnabled()) {
LOG.info("Queueing the initialization failure with " + session);
}
notifyInitializationCompleted(this); // Report failure to the main thread.
}
- if (session != null) {
- session.clearWm();
- session.setQueryId(null);
- // We can just restart the session if we have received one.
- try {
- tezAmPool.replaceSession(session);
- } catch (Exception e) {
- LOG.error("Failed to restart a failed session", e);
- }
+ future.setException(t);
+ discardSessionOnFailure(session);
+ }
+
+ public void discardSessionOnFailure(WmTezSession session) {
+ if (session == null) return;
+ session.clearWm();
+ session.setQueryId(null);
+ // We can just restart the session if we have received one.
+ try {
+ tezAmPool.replaceSession(session);
+ } catch (Exception e) {
+ LOG.error("Failed to restart a failed session", e);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4e43ec7c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 17f660d..62af917 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -150,6 +150,8 @@ public class TestWorkloadManager {
public static class WorkloadManagerForTest extends WorkloadManager {
+ private SettableFuture<Boolean> failedWait;
+
public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
QueryAllocationManager qam) throws ExecutionException, InterruptedException {
super(null, yarnQueue, conf, qam, createDummyPlan(numSessions));
@@ -180,7 +182,12 @@ public class TestWorkloadManager {
@Override
protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
conf = conf == null ? new HiveConf(getConf()) : conf;
- return new SampleTezSessionState(sessionId, this, conf);
+ SampleTezSessionState sess = new SampleTezSessionState(sessionId, this, conf);
+ if (failedWait != null) {
+ sess.setWaitForAmRegistryFuture(failedWait);
+ failedWait = null;
+ }
+ return sess;
}
@Override
@@ -215,6 +222,10 @@ public class TestWorkloadManager {
ensureWm();
return session;
}
+
+ public void setNextWaitForAmRegistryFuture(SettableFuture<Boolean> failedWait) {
+ this.failedWait = failedWait;
+ }
}
@Test(timeout = 10000)
@@ -1014,7 +1025,7 @@ public class TestWorkloadManager {
WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
Lists.newArrayList(pool("A", 1, 1.0f)));
plan.setMappings(Lists.newArrayList(mapping("A", "A")));
- final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+ final WorkloadManagerForTest wm = new WorkloadManagerForTest("test", conf, qam, plan);
wm.start();
// Make sure session init gets stuck in init.
@@ -1058,10 +1069,19 @@ public class TestWorkloadManager {
error.set(null);
theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
- // Initialization fails, no resource plan change.
+ // Initialization fails with retry, no resource plan change.
SettableFuture<Boolean> failedWait = SettableFuture.create();
failedWait.setException(new Exception("foo"));
theOnlySession.setWaitForAmRegistryFuture(failedWait);
+ TezSessionState retriedSession = wm.getSession(null, new MappingInput("A"), conf);
+ assertNotNull(retriedSession);
+ assertNotSame(theOnlySession, retriedSession); // Should have been replaced.
+ retriedSession.returnToSessionManager();
+ theOnlySession = (SampleTezSessionState)retriedSession;
+
+ // Initialization fails and so does the retry, no resource plan change.
+ theOnlySession.setWaitForAmRegistryFuture(failedWait);
+ wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
try {
TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
fail("Expected an error but got " + r);
@@ -1073,6 +1093,7 @@ public class TestWorkloadManager {
// Init fails, but the session is also killed by WM before that.
failedWait = SettableFuture.create();
theOnlySession.setWaitForAmRegistryFuture(failedWait);
+ wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
sessionA.set(null);
cdl = new CountDownLatch(1);
t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));