You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/14 07:45:52 UTC

svn commit: r1531795 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/tez/TezJobMonitor.java exec/tez/TezSessionState.java exec/tez/TezTask.java session/SessionState.java

Author: gunther
Date: Mon Oct 14 05:45:52 2013
New Revision: 1531795

URL: http://svn.apache.org/r1531795
Log:
HIVE-5533: Re-connect Tez session after AM timeout (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Oct 14 05:45:52 2013
@@ -69,7 +69,7 @@ public class TezJobMonitor {
     boolean done = false;
     int checkInterval = 500;
     int printInterval = 3000;
-    int maxRetryInterval = 5000;
+    int maxRetryInterval = 2500;
     int counter = 0;
     int failedCounter = 0;
     int rc = 0;
@@ -86,7 +86,6 @@ public class TezJobMonitor {
       try {
         status = dagClient.getDAGStatus();
         Map<String, Progress> progressMap = status.getVertexProgress();
-        failedCounter = 0;
         DAGStatus.State state = status.getState();
 
         if (state != lastState || state == RUNNING) {
@@ -132,9 +131,15 @@ public class TezJobMonitor {
             break;
           }
         }
+        if (!done) {
+          Thread.sleep(checkInterval);
+        }
       } catch (Exception e) {
-        if (failedCounter % maxRetryInterval/checkInterval == 0) {
+        console.printInfo("Exception: "+e.getMessage());
+        if (++failedCounter % maxRetryInterval/checkInterval == 0
+            || e instanceof InterruptedException) {
           try {
+            console.printInfo("Killing DAG...");
             dagClient.tryKillDAG();
           } catch(IOException io) {
             // best effort
@@ -145,18 +150,19 @@ public class TezJobMonitor {
           console.printError("Execution has failed.");
           rc = 1;
           done = true;
+        } else {
+          console.printInfo("Retrying...");
         }
-      }
-
-      if (done) {
-        if (rc != 0 && status != null) {
-          for (String diag: status.getDiagnostics()) {
-            console.printError(diag);
+      } finally {
+        if (done) {
+          if (rc != 0 && status != null) {
+            for (String diag: status.getDiagnostics()) {
+              console.printError(diag);
+            }
           }
+          break;
         }
-        break;
       }
-      Thread.sleep(500);
     }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     return rc;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Oct 14 05:45:52 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -131,15 +132,22 @@ public class TezSessionState {
    * @throws IOException
    * @throws TezException
    */
-  public void close() throws TezException, IOException {
+  public void close(boolean keepTmpDir) throws TezException, IOException {
     if (!isOpen()) {
       return;
     }
 
     LOG.info("Closing Tez Session");
-    session.stop();
-    FileSystem fs = tezScratchDir.getFileSystem(conf);
-    fs.delete(tezScratchDir, true);
+    try {
+      session.stop();
+    } catch (SessionNotRunning nr) {
+      // ignore
+    }
+
+    if (!keepTmpDir) {
+      FileSystem fs = tezScratchDir.getFileSystem(conf);
+      fs.delete(tezScratchDir, true);
+    }
     session = null;
     tezScratchDir = null;
     conf = null;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Oct 14 05:45:52 2013
@@ -19,11 +19,14 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -45,6 +49,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.tez.client.TezSession;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -194,12 +199,34 @@ public class TezTask extends Task<TezWor
 
   private DAGClient submit(JobConf conf, DAG dag, Path scratchDir,
       LocalResource appJarLr, TezSession session)
-      throws IOException, TezException, InterruptedException {
+      throws IOException, TezException, InterruptedException,
+      LoginException, URISyntaxException, HiveException {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
+    DAGClient dagClient = null;
+
+    try {
+      // ready to start execution on the cluster
+      dagClient = session.submitDAG(dag);
+    } catch (SessionNotRunning nr) {
+      console.printInfo("Tez session was closed. Reopening...");
 
-    // ready to start execution on the cluster
-    DAGClient dagClient = session.submitDAG(dag);
+      // Need to remove this static hack. But this is the way currently to
+      // get a session.
+      SessionState ss = SessionState.get();
+      TezSessionState tezSession = ss.getTezSession();
+
+      // close the old one, but keep the tmp files around
+      tezSession.close(true);
+
+      // (re)open the session
+      tezSession.open(ss.getSessionId(), this.conf);
+      session = tezSession.getSession();
+
+      console.printInfo("Session re-established.");
+
+      dagClient = session.submitDAG(dag);
+    }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
     return dagClient;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1531795&r1=1531794&r2=1531795&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Oct 14 05:45:52 2013
@@ -808,7 +808,7 @@ public class SessionState {
     }
 
     try {
-      tezSessionState.close();
+      tezSessionState.close(false);
     } catch (Exception e) {
       LOG.info("Error closing tez session", e);
     }