You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/14 07:45:52 UTC
svn commit: r1531795 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql:
exec/tez/TezJobMonitor.java exec/tez/TezSessionState.java
exec/tez/TezTask.java session/SessionState.java
Author: gunther
Date: Mon Oct 14 05:45:52 2013
New Revision: 1531795
URL: http://svn.apache.org/r1531795
Log:
HIVE-5533: Re-connect Tez session after AM timeout (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Oct 14 05:45:52 2013
@@ -69,7 +69,7 @@ public class TezJobMonitor {
boolean done = false;
int checkInterval = 500;
int printInterval = 3000;
- int maxRetryInterval = 5000;
+ int maxRetryInterval = 2500;
int counter = 0;
int failedCounter = 0;
int rc = 0;
@@ -86,7 +86,6 @@ public class TezJobMonitor {
try {
status = dagClient.getDAGStatus();
Map<String, Progress> progressMap = status.getVertexProgress();
- failedCounter = 0;
DAGStatus.State state = status.getState();
if (state != lastState || state == RUNNING) {
@@ -132,9 +131,15 @@ public class TezJobMonitor {
break;
}
}
+ if (!done) {
+ Thread.sleep(checkInterval);
+ }
} catch (Exception e) {
- if (failedCounter % maxRetryInterval/checkInterval == 0) {
+ console.printInfo("Exception: "+e.getMessage());
+ if (++failedCounter % maxRetryInterval/checkInterval == 0
+ || e instanceof InterruptedException) {
try {
+ console.printInfo("Killing DAG...");
dagClient.tryKillDAG();
} catch(IOException io) {
// best effort
@@ -145,18 +150,19 @@ public class TezJobMonitor {
console.printError("Execution has failed.");
rc = 1;
done = true;
+ } else {
+ console.printInfo("Retrying...");
}
- }
-
- if (done) {
- if (rc != 0 && status != null) {
- for (String diag: status.getDiagnostics()) {
- console.printError(diag);
+ } finally {
+ if (done) {
+ if (rc != 0 && status != null) {
+ for (String diag: status.getDiagnostics()) {
+ console.printError(diag);
+ }
}
+ break;
}
- break;
}
- Thread.sleep(500);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
return rc;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Oct 14 05:45:52 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -131,15 +132,22 @@ public class TezSessionState {
* @throws IOException
* @throws TezException
*/
- public void close() throws TezException, IOException {
+ public void close(boolean keepTmpDir) throws TezException, IOException {
if (!isOpen()) {
return;
}
LOG.info("Closing Tez Session");
- session.stop();
- FileSystem fs = tezScratchDir.getFileSystem(conf);
- fs.delete(tezScratchDir, true);
+ try {
+ session.stop();
+ } catch (SessionNotRunning nr) {
+ // ignore
+ }
+
+ if (!keepTmpDir) {
+ FileSystem fs = tezScratchDir.getFileSystem(conf);
+ fs.delete(tezScratchDir, true);
+ }
session = null;
tezScratchDir = null;
conf = null;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Oct 14 05:45:52 2013
@@ -19,11 +19,14 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.security.auth.login.LoginException;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.tez.client.TezSession;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
@@ -194,12 +199,34 @@ public class TezTask extends Task<TezWor
private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
LocalResource appJarLr, TezSession session)
- throws IOException, TezException, InterruptedException {
+ throws IOException, TezException, InterruptedException,
+ LoginException, URISyntaxException, HiveException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
+ DAGClient dagClient = null;
+
+ try {
+ // ready to start execution on the cluster
+ dagClient = session.submitDAG(dag);
+ } catch (SessionNotRunning nr) {
+ console.printInfo("Tez session was closed. Reopening...");
- // ready to start execution on the cluster
- DAGClient dagClient = session.submitDAG(dag);
+ // Need to remove this static hack. But this is the way currently to
+ // get a session.
+ SessionState ss = SessionState.get();
+ TezSessionState tezSession = ss.getTezSession();
+
+ // close the old one, but keep the tmp files around
+ tezSession.close(true);
+
+ // (re)open the session
+ tezSession.open(ss.getSessionId(), this.conf);
+ session = tezSession.getSession();
+
+ console.printInfo("Session re-established.");
+
+ dagClient = session.submitDAG(dag);
+ }
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
return dagClient;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Oct 14 05:45:52 2013
@@ -808,7 +808,7 @@ public class SessionState {
}
try {
- tezSessionState.close();
+ tezSessionState.close(false);
} catch (Exception e) {
LOG.info("Error closing tez session", e);
}