You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/11/29 20:21:05 UTC

[2/2] hive git commit: HIVE-18076 : killquery doesn't actually work for non-trigger WM kills (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-18076 : killquery doesn't actually work for non-trigger WM kills (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b6760b01
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b6760b01
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b6760b01

Branch: refs/heads/master
Commit: b6760b017cf726a82d727d2970cdcfc0e7eebcb2
Parents: 3500196
Author: sergey <se...@apache.org>
Authored: Wed Nov 29 12:17:01 2017 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Nov 29 12:17:01 2017 -0800

----------------------------------------------------------------------
 .../TestLlapTaskSchedulerService.java           |  1 -
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 10 ++-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 11 ++-
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |  1 -
 .../hive/ql/exec/tez/WorkloadManager.java       | 77 +++++++++++---------
 5 files changed, 58 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 5460248..90b31e4 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -429,7 +429,6 @@ public class TestLlapTaskSchedulerService {
       TaskInfo ti1 = tsWrapper.ts.getTaskInfo(task1), ti2 = tsWrapper.ts.getTaskInfo(task2);
       assertFalse(ti1.isGuaranteed() || ti2.isGuaranteed());
 
-      // TODO#       ts.notifyStarted(task);
       tsWrapper.ts.updateGuaranteedCount(1);
       tsWrapper.ts.waitForMessagesSent(1);
       TaskInfo tiHigher = ti1.isGuaranteed() ? ti1 : ti2, tiLower = (tiHigher == ti1) ? ti2 : ti1;

http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 4d2cf88..3bcf657 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -107,6 +107,7 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
     int threadCount = Math.min(initialSize,
         HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
+    this.parentSessionState = SessionState.get();
     if (threadCount == 1) {
       for (int i = 0; i < initialSize; ++i) {
         SessionType session = sessionObjFactory.create(null);
@@ -115,7 +116,6 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       }
     } else {
       final AtomicInteger remaining = new AtomicInteger(initialSize);
-      this.parentSessionState = SessionState.get();
       @SuppressWarnings("unchecked")
       FutureTask<Boolean>[] threadTasks = new FutureTask[threadCount];
       for (int i = threadTasks.length - 1; i >= 0; --i) {
@@ -272,6 +272,14 @@ class TezSessionPool<SessionType extends TezSessionPoolSession> {
       // The caller probably created the new session with the old config, but update the
       // registry again just in case. TODO: maybe we should enforce that.
       configureAmRegistry(newSession);
+      if (SessionState.get() == null && parentSessionState != null) {
+        // Tez session relies on a threadlocal for open... If we are on some non-session thread,
+        // just use the same SessionState we used for the initial sessions.
+        // Technically, given that all pool sessions are initially based on this state, shoudln't
+        // we also set this at all times and not rely on an external session stuff? We should
+        // probably just get rid of the thread local usage in TezSessionState.
+        SessionState.setCurrentSessionState(parentSessionState);
+      }
       newSession.open(additionalFiles, scratchDir);
       if (!putSessionBack(newSession, false)) {
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 8087b01..af77f30 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -171,9 +171,10 @@ public class TezTask extends Task<TezWork> {
 
       TriggerContext triggerContext = ctx.getTriggerContext();
       triggerContext.setDesiredCounters(desiredCounters);
-      session.setTriggerContext(triggerContext);
-      LOG.info("Subscribed to counters: {} for queryId: {}", desiredCounters, triggerContext.getQueryId());
+      LOG.info("Subscribed to counters: {} for queryId: {}",
+          desiredCounters, triggerContext.getQueryId());
       ss.setTezSession(session);
+      session.setTriggerContext(triggerContext);
       try {
         // jobConf will hold all the configuration for hadoop, tez, and hive
         JobConf jobConf = utils.createConfiguration(conf);
@@ -190,8 +191,7 @@ public class TezTask extends Task<TezWork> {
 
         // This is used to compare global and vertex resources. Global resources are originally
         // derived from session conf via localizeTempFilesFromConf. So, use that here.
-        Configuration sessionConf =
-            (session != null && session.getConf() != null) ? session.getConf() : conf;
+        Configuration sessionConf = (session.getConf() != null) ? session.getConf() : conf;
         Map<String,LocalResource> inputOutputLocalResources =
             getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
 
@@ -584,7 +584,10 @@ public class TezTask extends Task<TezWork> {
       try {
         console.printInfo("Dag submit failed due to " + e.getMessage() + " stack trace: "
             + Arrays.toString(e.getStackTrace()) + " retrying...");
+        // TODO: this is temporary, need to refactor how reopen is invoked.
+        TriggerContext oldCtx = sessionState.getTriggerContext();
         sessionState = sessionState.reopen(conf, inputOutputJars);
+        sessionState.setTriggerContext(oldCtx);
         dagClient = sessionState.getSession().submitDAG(dag);
       } catch (Exception retryException) {
         // we failed to submit after retrying. Destroy session and bail.

http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 96d70c9..d61c531 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -143,7 +143,6 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
   void clearWm() {
     this.poolName = null;
     this.clusterFraction = 0f;
-    this.queryId = null;
   }
 
   double getClusterFraction() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b6760b01/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index a8360bd..388a4f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -1,5 +1,4 @@
 /**
-/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -396,18 +395,22 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         //       query is being killed until both the kill, and the user, return it.
         String queryId = toKill.getQueryId();
         KillQuery kq = toKill.getKillQuery();
-        if (kq != null && queryId != null) {
-          LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
-          try {
-            kq.killQuery(queryId, reason);
-            addKillQueryResult(toKill, true);
-            LOG.debug("Killed " + queryId);
-            return;
-          } catch (HiveException ex) {
-            LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+        try {
+          if (kq != null && queryId != null) {
+            LOG.info("Invoking KillQuery for " + queryId + ": " + reason);
+            try {
+              kq.killQuery(queryId, reason);
+              addKillQueryResult(toKill, true);
+              LOG.debug("Killed " + queryId);
+              return;
+            } catch (HiveException ex) {
+              LOG.error("Failed to kill " + queryId + "; will try to restart AM instead" , ex);
+            }
+          } else {
+            LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
           }
-        } else {
-          LOG.info("Will queue restart for {}; queryId {}, killQuery {}", toKill, queryId, kq);
+        } finally {
+          toKill.setQueryId(null);
         }
         // We cannot restart in place because the user might receive a failure and return the
         // session to the master thread without the "irrelevant" flag set. In fact, the query might
@@ -421,12 +424,13 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // 2. Restart pool sessions.
     for (final WmTezSession toRestart : context.toRestartInUse) {
       LOG.info("Replacing {} with a new session", toRestart);
+      toRestart.setQueryId(null);
       workPool.submit(() -> {
         try {
           // Note: sessions in toRestart are always in use, so they cannot expire in parallel.
           tezAmPool.replaceSession(toRestart, false, null);
         } catch (Exception ex) {
-          LOG.error("Failed to restart an old session; ignoring " + ex.getMessage());
+          LOG.error("Failed to restart an old session; ignoring", ex);
         }
       });
     }
@@ -564,7 +568,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // May be change command to support ... DELAYED MOVE TO etl ... which will run under src cluster fraction as long
     // as possible
     for (MoveSession moveSession : e.moveSessions) {
-      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute);
+      handleMoveSessionOnMasterThread(moveSession, syncWork, poolsToRedistribute, e.toReuse);
     }
     e.moveSessions.clear();
 
@@ -676,9 +680,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
-  private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
-    final WmThreadSyncWork syncWork,
-    final HashSet<String> poolsToRedistribute) {
+  private void handleMoveSessionOnMasterThread(MoveSession moveSession, WmThreadSyncWork syncWork,
+      Set<String> poolsToRedistribute, Map<WmTezSession, GetRequest> toReuse) {
     String destPoolName = moveSession.destPool;
     LOG.info("Handling move session event: {}", moveSession);
     if (validMove(moveSession.srcSession, destPoolName)) {
@@ -689,7 +692,8 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         // check if there is capacity in dest pool, if so move else kill the session
         if (capacityAvailable(destPoolName)) {
           // add to destination pool
-          Boolean added = checkAndAddSessionToAnotherPool(moveSession.srcSession, destPoolName, poolsToRedistribute);
+          Boolean added = checkAndAddSessionToAnotherPool(
+              moveSession.srcSession, destPoolName, poolsToRedistribute);
           if (added != null && added) {
             moveSession.future.set(true);
             return;
@@ -697,10 +701,10 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
             LOG.error("Failed to move session: {}. Session is not added to destination.", moveSession);
           }
         } else {
-          moveSession.srcSession.clearWm();
-          moveSession.srcSession.setIsIrrelevantForWm("Destination pool "
-          + destPoolName + " is full. Killing query.");
-          syncWork.toRestartInUse.add(moveSession.srcSession);
+          WmTezSession session = moveSession.srcSession;
+          resetRemovedSessionToKill(session, toReuse);
+          syncWork.toKillQuery.put(session, new KillQueryContext(session, "Destination pool "
+              + destPoolName + " is full. Killing query."));
         }
       } else {
         LOG.error("Failed to move session: {}. Session is not removed from its pool.", moveSession);
@@ -785,6 +789,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     if (reuseRequest != null) {
       reuseRequest.future.setException(new AssertionError("Invalid reuse attempt"));
     }
+    session.setQueryId(null);
     return checkAndRemoveSessionFromItsPool(session, poolsToRedistribute, isReturn);
   }
 
@@ -1086,6 +1091,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     WmTezSession session = req.sessionToReuse;
     if (session == null) return;
     req.sessionToReuse = null;
+    session.setQueryId(null);
     if (poolsToRedistribute != null) {
       RemoveSessionResult rr = checkAndRemoveSessionFromItsPool(
           session, poolsToRedistribute, true);
@@ -1119,7 +1125,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
    *         in WM but wasn't found in the requisite pool (internal error?).
    */
   private RemoveSessionResult checkAndRemoveSessionFromItsPool(
-      WmTezSession session, HashSet<String> poolsToRedistribute, Boolean isSessionOk) {
+      WmTezSession session, Set<String> poolsToRedistribute, Boolean isSessionOk) {
     // It is possible for some request to be queued after a main thread has decided to kill this
     // session; on the next iteration, we'd be processing that request with an irrelevant session.
     if (session.isIrrelevantForWm()) {
@@ -1146,7 +1152,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
   }
 
   private Boolean checkAndAddSessionToAnotherPool(
-    WmTezSession session, String destPoolName, HashSet<String> poolsToRedistribute) {
+    WmTezSession session, String destPoolName, Set<String> poolsToRedistribute) {
     if (session.isIrrelevantForWm()) {
       // This is called only during move session handling, removing session already checks this.
       // So this is not expected as remove failing will not even invoke this method
@@ -1624,7 +1630,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         IdentityHashMap<WmTezSession, GetRequest> toReuse,
         Map<WmTezSession, KillQueryContext> toKill) {
       for (WmTezSession sessionToKill : sessions) {
-        resetRemovedSession(sessionToKill, toReuse);
+        resetRemovedSessionToKill(sessionToKill, toReuse);
         toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
       }
       sessions.clear();
@@ -1635,21 +1641,12 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         if (sessionToKill == null) {
           continue; // Async op in progress; the callback will take care of this.
         }
-        resetRemovedSession(sessionToKill, toReuse);
+        resetRemovedSessionToKill(sessionToKill, toReuse);
         toKill.put(sessionToKill, new KillQueryContext(sessionToKill, killReason));
       }
       initializingSessions.clear();
     }
 
-    private void resetRemovedSession(WmTezSession sessionToKill,
-        IdentityHashMap<WmTezSession, GetRequest> toReuse) {
-      sessionToKill.clearWm();
-      GetRequest req = toReuse.remove(sessionToKill);
-      if (req != null) {
-        req.sessionToReuse = null;
-      }
-    }
-
     public void setTriggers(final LinkedList<Trigger> triggers) {
       this.triggers = triggers;
     }
@@ -1786,6 +1783,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       }
       if (session != null) {
         session.clearWm();
+        session.setQueryId(null);
         // We can just restart the session if we have received one.
         try {
           tezAmPool.replaceSession(session, false, null);
@@ -1914,6 +1912,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private static void resetRemovedSessionToKill(
+      WmTezSession sessionToKill, Map<WmTezSession, GetRequest> toReuse) {
+    sessionToKill.clearWm();
+    GetRequest req = toReuse.remove(sessionToKill);
+    if (req != null) {
+      req.sessionToReuse = null;
+    }
+  }
+
   @VisibleForTesting
   TezSessionPool<WmTezSession> getTezAmPool() {
     return tezAmPool;