You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/03 01:17:18 UTC

hive git commit: HIVE-12556 : Ctrl-C in beeline doesn't kill Tez query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/master 187829fa9 -> 8d22a60c8


HIVE-12556 : Ctrl-C in beeline doesn't kill Tez query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d22a60c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d22a60c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d22a60c

Branch: refs/heads/master
Commit: 8d22a60c8c4a247aa396b8c3841b6ebdce51f508
Parents: 187829f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 2 16:16:35 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 2 16:16:59 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   5 +
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |  21 +++-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 109 ++++++++++++++++++-
 3 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 8fafd61..62b608c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1901,6 +1901,11 @@ public class Driver implements CommandProcessor {
 
   public int close() {
     try {
+      try {
+        releaseResources();
+      } catch (Exception e) {
+        LOG.info("Exception while releasing resources", e);
+      }
       if (fetchTask != null) {
         try {
           fetchTask.clearFetch();

http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 59e9d29..f6bc19c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -22,6 +22,7 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 import static org.fusesource.jansi.Ansi.ansi;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.PrintStream;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
@@ -135,9 +136,7 @@ public class TezJobMonitor {
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
-        for (DAGClient c: shutdownList) {
-          TezJobMonitor.killRunningJobs();
-        }
+        TezJobMonitor.killRunningJobs();
         try {
           for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) {
             System.err.println("Shutting down tez session.");
@@ -346,8 +345,8 @@ public class TezJobMonitor {
         }
       } catch (Exception e) {
         console.printInfo("Exception: " + e.getMessage());
-        if (++failedCounter % maxRetryInterval / checkInterval == 0
-            || e instanceof InterruptedException) {
+        boolean isInterrupted = hasInterruptedException(e);
+        if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
           try {
             console.printInfo("Killing DAG...");
             dagClient.tryKillDAG();
@@ -376,10 +375,22 @@ public class TezJobMonitor {
         }
       }
     }
+
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     return rc;
   }
 
+  private static boolean hasInterruptedException(Throwable e) {
+    // Hadoop IPC wraps InterruptedException. GRRR.
+    while (e != null) {
+      if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+        return true;
+      }
+      e = e.getCause();
+    }
+    return false;
+  }
+
   /**
    * killRunningJobs tries to terminate execution of all
    * currently running tez queries. No guarantees, best effort only.

http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a6d911d..a2060da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,6 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.tez.client.CallerContext;
@@ -64,10 +69,13 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.json.JSONObject;
 
 /**
@@ -86,6 +94,8 @@ public class TezTask extends Task<TezWork> {
 
   private final DagUtils utils;
 
+  private DAGClient dagClient = null;
+
   Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
   Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
 
@@ -107,7 +117,6 @@ public class TezTask extends Task<TezWork> {
     int rc = 1;
     boolean cleanContext = false;
     Context ctx = null;
-    DAGClient client = null;
     TezSessionState session = null;
 
     try {
@@ -177,12 +186,12 @@ public class TezTask extends Task<TezWork> {
       addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
 
       // submit will send the job to the cluster and start executing
-      client = submit(jobConf, dag, scratchDir, appJarLr, session,
+      dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
           additionalLr, inputOutputJars, inputOutputLocalResources);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
-      rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag);
+      rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag);
       if (rc != 0) {
         this.setException(new HiveException(monitor.getDiagnostics()));
       }
@@ -190,7 +199,7 @@ public class TezTask extends Task<TezWork> {
       // fetch the counters
       try {
         Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+        counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
       } catch (Exception err) {
         // Don't fail execution due to counters - just don't print summary info
         LOG.error("Failed to get counters: " + err, err);
@@ -231,7 +240,7 @@ public class TezTask extends Task<TezWork> {
         }
       }
       // need to either move tmp files or remove them
-      if (client != null) {
+      if (dagClient != null) {
         // rc will only be overwritten if close errors out
         rc = close(work, rc);
       }
@@ -462,7 +471,7 @@ public class TezTask extends Task<TezWork> {
     }
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
-    return dagClient;
+    return new SyncDagClient(dagClient);
   }
 
   /*
@@ -544,4 +553,92 @@ public class TezTask extends Task<TezWork> {
 
     return ((ReduceWork)children.get(0)).getReducer();
   }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (dagClient != null) {
+      LOG.info("Shutting down Tez task " + this);
+      try {
+        dagClient.tryKillDAG();
+        LOG.info("Waiting for Tez task to shut down: " + this);
+        dagClient.waitForCompletion();
+      } catch (Exception ex) {
+        LOG.info("Failed to shut down TezTask" + this, ex);
+      }
+    }
+  }
+
+  /** DAG client that does dumb global sync on all the method calls;
+   * Tez DAG client is not thread safe and getting the 2nd one is not recommended. */
+  public class SyncDagClient extends DAGClient {
+    private final DAGClient dagClient;
+
+    public SyncDagClient(DAGClient dagClient) {
+      super();
+      this.dagClient = dagClient;
+    }
+
+    @Override
+    public void close() throws IOException {
+      dagClient.close(); // Don't sync.
+    }
+
+    @Override
+    public String getExecutionContext() {
+      return dagClient.getExecutionContext(); // Don't sync.
+    }
+
+    @Override
+    @Private
+    protected ApplicationReport getApplicationReportInternal() {
+      throw new UnsupportedOperationException(); // The method is not exposed, and we don't use it.
+    }
+
+    @Override
+    public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+        throws IOException, TezException {
+      synchronized (dagClient) {
+        return dagClient.getDAGStatus(statusOptions);
+      }
+    }
+
+    @Override
+    public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+        long timeout) throws IOException, TezException {
+      synchronized (dagClient) {
+        return dagClient.getDAGStatus(statusOptions, timeout);
+      }
+    }
+
+    @Override
+    public VertexStatus getVertexStatus(String vertexName,
+        Set<StatusGetOpts> statusOptions) throws IOException, TezException {
+      synchronized (dagClient) {
+        return dagClient.getVertexStatus(vertexName, statusOptions);
+      }
+    }
+
+    @Override
+    public void tryKillDAG() throws IOException, TezException {
+      synchronized (dagClient) {
+        dagClient.tryKillDAG();
+      }
+    }
+
+    @Override
+    public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+      synchronized (dagClient) {
+        return dagClient.waitForCompletion();
+      }
+    }
+
+    @Override
+    public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
+        throws IOException, TezException, InterruptedException {
+      synchronized (dagClient) {
+        return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
+      }
+    }
+  }
 }