You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/12/21 02:57:20 UTC

hive git commit: HIVE-18078 : WM getSession needs some retry logic (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 3407e723a -> 4e43ec7c4


HIVE-18078 : WM getSession needs some retry logic (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4e43ec7c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4e43ec7c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4e43ec7c

Branch: refs/heads/master
Commit: 4e43ec7c4c623537bf46b74ba5c01f314db72162
Parents: 3407e72
Author: sergey <se...@apache.org>
Authored: Wed Dec 20 18:51:51 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Dec 20 18:51:51 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/tez/WorkloadManager.java       | 69 +++++++++++++-------
 .../hive/ql/exec/tez/TestWorkloadManager.java   | 27 +++++++-
 2 files changed, 71 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4e43ec7c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index cda2bba..387d078 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -876,8 +876,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
           session.getWmContext(), session.extractHiveResources());
       // We have just removed the session from the same pool, so don't check concurrency here.
       pool.initializingSessions.add(sw);
-      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
-      Futures.addCallback(getFuture, sw);
+      sw.start();
       syncWork.toRestartInUse.add(session);
       return;
     case IGNORE:
@@ -1143,8 +1142,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       // See SessionInitContext javadoc.
       SessionInitContext sw = new SessionInitContext(
           queueReq.future, poolName, queueReq.queryId, queueReq.wmContext, null);
-      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
-      Futures.addCallback(getFuture, sw);
+      sw.start();
       // It is possible that all the async methods returned on the same thread because the
       // session with registry data and stuff was available in the pool.
       // If this happens, we'll take the session out here and "cancel" the init so we skip
@@ -1778,6 +1776,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    * for async session initialization, as well as parallel cancellation.
    */
   private final class SessionInitContext implements FutureCallback<WmTezSession> {
+    private final static int MAX_ATTEMPT_NUMBER = 1; // Retry once.
+
     private final String poolName, queryId;
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -1788,6 +1788,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private HiveResources prelocalizedResources;
     private Path pathToDelete;
     private WmContext wmContext;
+    private int attemptNumber = 0;
 
     public SessionInitContext(SettableFuture<WmTezSession> future,
         String poolName, String queryId, WmContext wmContext,
@@ -1800,6 +1801,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       this.wmContext = wmContext;
     }
 
+    public void start() throws Exception {
+      ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
+      Futures.addCallback(getFuture, this);
+    }
+
     @Override
     public void onSuccess(WmTezSession session) {
       SessionInitState oldState;
@@ -1856,16 +1862,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
       case WAITING_FOR_REGISTRY: {
         // Notify the master thread and the user.
-        future.set(session);
         notifyInitializationCompleted(this);
+        future.set(session);
         break;
       }
       case CANCELED: {
         // Return session to the pool; we can do it directly here.
         future.setException(new HiveException(
             "The query was killed by workload management: " + cancelReason));
-        session.setPoolName(null);
-        session.setClusterFraction(0f);
+        session.clearWm();
         session.setQueryId(null);
         session.setWmContext(null);
         tezAmPool.returnSession(session);
@@ -1883,36 +1888,56 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     public void onFailure(Throwable t) {
       SettableFuture<WmTezSession> future;
       WmTezSession session;
-      boolean wasCanceled = false;
+      boolean wasCanceled = false, doRetry = false;
       lock.lock();
       try {
         wasCanceled = (state == SessionInitState.CANCELED);
         session = this.session;
-        future = this.future;
-        this.future = null;
         this.session = null;
-        if (!wasCanceled) {
-          this.state = SessionInitState.DONE;
+        doRetry = !wasCanceled && (attemptNumber < MAX_ATTEMPT_NUMBER);
+        if (doRetry) {
+          ++attemptNumber;
+          this.state = SessionInitState.GETTING;
+          future = null;
+        } else {
+          future = this.future;
+          this.future = null;
+          if (!wasCanceled) {
+            this.state = SessionInitState.DONE;
+          }
         }
       } finally {
         lock.unlock();
       }
-      future.setException(t);
+      if (doRetry) {
+        try {
+          start();
+          return;
+        } catch (Exception e) {
+          LOG.error("Failed to retry; propagating original error. The new error is ", e);
+        } finally {
+          discardSessionOnFailure(session);
+        }
+      }
       if (!wasCanceled) {
         if (LOG.isDebugEnabled()) {
           LOG.info("Queueing the initialization failure with " + session);
         }
         notifyInitializationCompleted(this); // Report failure to the main thread.
       }
-      if (session != null) {
-        session.clearWm();
-        session.setQueryId(null);
-        // We can just restart the session if we have received one.
-        try {
-          tezAmPool.replaceSession(session);
-        } catch (Exception e) {
-          LOG.error("Failed to restart a failed session", e);
-        }
+      future.setException(t);
+      discardSessionOnFailure(session);
+    }
+
+    public void discardSessionOnFailure(WmTezSession session) {
+      if (session == null) return;
+      session.clearWm();
+      session.setQueryId(null);
+      // We can just restart the session if we have received one.
+      try {
+        tezAmPool.replaceSession(session);
+      } catch (Exception e) {
+        LOG.error("Failed to restart a failed session", e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4e43ec7c/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
index 17f660d..62af917 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestWorkloadManager.java
@@ -150,6 +150,8 @@ public class TestWorkloadManager {
 
   public static class WorkloadManagerForTest extends WorkloadManager {
 
+    private SettableFuture<Boolean> failedWait;
+
     public WorkloadManagerForTest(String yarnQueue, HiveConf conf, int numSessions,
         QueryAllocationManager qam) throws ExecutionException, InterruptedException {
       super(null, yarnQueue, conf, qam, createDummyPlan(numSessions));
@@ -180,7 +182,12 @@ public class TestWorkloadManager {
     @Override
     protected WmTezSession createSessionObject(String sessionId, HiveConf conf) {
       conf = conf == null ? new HiveConf(getConf()) : conf;
-      return new SampleTezSessionState(sessionId, this, conf);
+      SampleTezSessionState sess = new SampleTezSessionState(sessionId, this, conf);
+      if (failedWait != null) {
+        sess.setWaitForAmRegistryFuture(failedWait);
+        failedWait = null;
+      }
+      return sess;
     }
 
     @Override
@@ -215,6 +222,10 @@ public class TestWorkloadManager {
       ensureWm();
       return session;
     }
+
+    public void setNextWaitForAmRegistryFuture(SettableFuture<Boolean> failedWait) {
+      this.failedWait = failedWait;
+    }
   }
 
   @Test(timeout = 10000)
@@ -1014,7 +1025,7 @@ public class TestWorkloadManager {
     WMFullResourcePlan plan = new WMFullResourcePlan(plan(),
         Lists.newArrayList(pool("A", 1, 1.0f)));
     plan.setMappings(Lists.newArrayList(mapping("A", "A")));
-    final WorkloadManager wm = new WorkloadManagerForTest("test", conf, qam, plan);
+    final WorkloadManagerForTest wm = new WorkloadManagerForTest("test", conf, qam, plan);
     wm.start();
 
     // Make sure session init gets stuck in init.
@@ -1058,10 +1069,19 @@ public class TestWorkloadManager {
     error.set(null);
     theOnlySession = validatePoolAfterCleanup(theOnlySession, conf, wm, pool, "B");
 
-    // Initialization fails, no resource plan change.
+    // Initialization fails with retry, no resource plan change.
     SettableFuture<Boolean> failedWait = SettableFuture.create();
     failedWait.setException(new Exception("foo"));
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
+    TezSessionState retriedSession = wm.getSession(null, new MappingInput("A"), conf);
+    assertNotNull(retriedSession);
+    assertNotSame(theOnlySession, retriedSession); // Should have been replaced.
+    retriedSession.returnToSessionManager();
+    theOnlySession = (SampleTezSessionState)retriedSession;
+
+    // Initialization fails and so does the retry, no resource plan change.
+    theOnlySession.setWaitForAmRegistryFuture(failedWait);
+    wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
     try {
       TezSessionState r = wm.getSession(null, new MappingInput("A"), conf);
       fail("Expected an error but got " + r);
@@ -1073,6 +1093,7 @@ public class TestWorkloadManager {
     // Init fails, but the session is also killed by WM before that.
     failedWait = SettableFuture.create();
     theOnlySession.setWaitForAmRegistryFuture(failedWait);
+    wm.setNextWaitForAmRegistryFuture(failedWait); // Fail the retry.
     sessionA.set(null);
     cdl = new CountDownLatch(1);
     t1 = new Thread(new GetSessionRunnable(sessionA, wm, error, conf, cdl, "A"));