You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/15 09:58:33 UTC
svn commit: r1542206 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
TezJobMonitor.java TezSessionState.java
Author: gunther
Date: Fri Nov 15 08:58:32 2013
New Revision: 1542206
URL: http://svn.apache.org/r1542206
Log:
HIVE-5832: Add shutdown hook to stop tez dag/session if jvm dies (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1542206&r1=1542205&r2=1542206&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Fri Nov 15 08:58:32 2013
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.exec.t
import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -54,7 +57,32 @@ public class TezJobMonitor {
private final int printInterval = 3000;
private long lastPrintTime;
private Set<String> completed;
+ private static final List<DAGClient> shutdownList;
+ static {
+ shutdownList = Collections.synchronizedList(new LinkedList<DAGClient>());
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ for (DAGClient c: shutdownList) {
+ try {
+ System.err.println("Trying to shutdown DAG");
+ c.tryKillDAG();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ try {
+ for (TezSessionState s: TezSessionState.getOpenSessions()) {
+ System.err.println("Shutting down tez session.");
+ s.close(false);
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ }
public TezJobMonitor() {
console = new LogHelper(LOG);
@@ -67,7 +95,7 @@ public class TezJobMonitor {
* @param dagClient client that was used to kick off the job
* @return int 0 - success, 1 - killed, 2 - failed
*/
- public int monitorExecution(DAGClient dagClient) throws InterruptedException {
+ public int monitorExecution(final DAGClient dagClient) throws InterruptedException {
DAGStatus status = null;
completed = new HashSet<String>();
@@ -79,6 +107,8 @@ public class TezJobMonitor {
String lastReport = null;
Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
+ shutdownList.add(dagClient);
+
console.printInfo("\n");
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
@@ -163,6 +193,7 @@ public class TezJobMonitor {
console.printError(diag);
}
}
+ shutdownList.remove(dagClient);
break;
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1542206&r1=1542205&r2=1542206&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Nov 15 08:58:32 2013
@@ -20,7 +20,10 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import javax.security.auth.login.LoginException;
@@ -56,6 +59,9 @@ public class TezSessionState {
private TezSession session;
private String sessionId;
+ private static List<TezSessionState> openSessions
+ = Collections.synchronizedList(new LinkedList<TezSessionState>());
+
/**
* Constructor. We do not automatically connect, because we only want to
* load tez classes when the user has tez installed.
@@ -71,6 +77,14 @@ public class TezSessionState {
}
/**
+ * Get all open sessions. Only used to clean up at shutdown.
+ * @return List<TezSessionState>
+ */
+ public static List<TezSessionState> getOpenSessions() {
+ return openSessions;
+ }
+
+ /**
* Creates a tez session. A session is tied to either a cli/hs2 session. You can
* submit multiple DAGs against a session (as long as they are executed serially).
* @throws IOException
@@ -116,6 +130,8 @@ public class TezSessionState {
// id is used for tez to reuse the current session rather than start a new one.
conf.set("mapreduce.framework.name", "yarn-tez");
conf.set("mapreduce.tez.session.tokill-application-id", session.getApplicationId().toString());
+
+ openSessions.add(this);
}
/**
@@ -132,6 +148,7 @@ public class TezSessionState {
LOG.info("Closing Tez Session");
try {
session.stop();
+ openSessions.remove(this);
} catch (SessionNotRunning nr) {
// ignore
}