You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/15 09:58:33 UTC

svn commit: r1542206 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: TezJobMonitor.java TezSessionState.java

Author: gunther
Date: Fri Nov 15 08:58:32 2013
New Revision: 1542206

URL: http://svn.apache.org/r1542206
Log:
HIVE-5832: Add shutdown hook to stop tez dag/session if jvm dies (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1542206&r1=1542205&r2=1542206&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Fri Nov 15 08:58:32 2013
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.exec.t
 import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -54,7 +57,32 @@ public class TezJobMonitor {
   private final int printInterval = 3000;
   private long lastPrintTime;
   private Set<String> completed;
+  private static final List<DAGClient> shutdownList;
 
+  static {
+    shutdownList = Collections.synchronizedList(new LinkedList<DAGClient>());
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        for (DAGClient c: shutdownList) {
+          try {
+            System.err.println("Trying to shutdown DAG");
+            c.tryKillDAG();
+          } catch (Exception e) {
+            // ignore
+          }
+        }
+        try {
+          for (TezSessionState s: TezSessionState.getOpenSessions()) {
+            System.err.println("Shutting down tez session.");
+            s.close(false);
+          }
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    });
+  }
 
   public TezJobMonitor() {
     console = new LogHelper(LOG);
@@ -67,7 +95,7 @@ public class TezJobMonitor {
    * @param dagClient client that was used to kick off the job
    * @return int 0 - success, 1 - killed, 2 - failed
    */
-  public int monitorExecution(DAGClient dagClient) throws InterruptedException {
+  public int monitorExecution(final DAGClient dagClient) throws InterruptedException {
     DAGStatus status = null;
     completed = new HashSet<String>();
 
@@ -79,6 +107,8 @@ public class TezJobMonitor {
     String lastReport = null;
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
 
+    shutdownList.add(dagClient);
+
     console.printInfo("\n");
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
@@ -163,6 +193,7 @@ public class TezJobMonitor {
               console.printError(diag);
             }
           }
+          shutdownList.remove(dagClient);
           break;
         }
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1542206&r1=1542205&r2=1542206&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Nov 15 08:58:32 2013
@@ -20,7 +20,10 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import javax.security.auth.login.LoginException;
@@ -56,6 +59,9 @@ public class TezSessionState {
   private TezSession session;
   private String sessionId;
 
+  private static List<TezSessionState> openSessions
+    = Collections.synchronizedList(new LinkedList<TezSessionState>());
+
   /**
    * Constructor. We do not automatically connect, because we only want to
    * load tez classes when the user has tez installed.
@@ -71,6 +77,14 @@ public class TezSessionState {
   }
 
   /**
+   * Get all open sessions. Only used to clean up at shutdown.
+   * @return List<TezSessionState>
+   */
+  public static List<TezSessionState> getOpenSessions() {
+    return openSessions;
+  }
+
+  /**
    * Creates a tez session. A session is tied to either a cli/hs2 session. You can
    * submit multiple DAGs against a session (as long as they are executed serially).
    * @throws IOException
@@ -116,6 +130,8 @@ public class TezSessionState {
     // id is used for tez to reuse the current session rather than start a new one.
     conf.set("mapreduce.framework.name", "yarn-tez");
     conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+
+    openSessions.add(this);
   }
 
   /**
@@ -132,6 +148,7 @@ public class TezSessionState {
     LOG.info("Closing Tez Session");
     try {
       session.stop();
+      openSessions.remove(this);
     } catch (SessionNotRunning nr) {
       // ignore
     }