You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/12/03 01:17:18 UTC
hive git commit: HIVE-12556 : Ctrl-C in beeline doesn't kill Tez
query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master 187829fa9 -> 8d22a60c8
HIVE-12556 : Ctrl-C in beeline doesn't kill Tez query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d22a60c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d22a60c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d22a60c
Branch: refs/heads/master
Commit: 8d22a60c8c4a247aa396b8c3841b6ebdce51f508
Parents: 187829f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Dec 2 16:16:35 2015 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Dec 2 16:16:59 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 5 +
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 21 +++-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 109 ++++++++++++++++++-
3 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 8fafd61..62b608c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1901,6 +1901,11 @@ public class Driver implements CommandProcessor {
public int close() {
try {
+ try {
+ releaseResources();
+ } catch (Exception e) {
+ LOG.info("Exception while releasing resources", e);
+ }
if (fetchTask != null) {
try {
fetchTask.clearFetch();
http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 59e9d29..f6bc19c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -22,6 +22,7 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
import static org.fusesource.jansi.Ansi.ansi;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.PrintStream;
import java.text.DecimalFormat;
import java.text.NumberFormat;
@@ -135,9 +136,7 @@ public class TezJobMonitor {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- for (DAGClient c: shutdownList) {
- TezJobMonitor.killRunningJobs();
- }
+ TezJobMonitor.killRunningJobs();
try {
for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) {
System.err.println("Shutting down tez session.");
@@ -346,8 +345,8 @@ public class TezJobMonitor {
}
} catch (Exception e) {
console.printInfo("Exception: " + e.getMessage());
- if (++failedCounter % maxRetryInterval / checkInterval == 0
- || e instanceof InterruptedException) {
+ boolean isInterrupted = hasInterruptedException(e);
+ if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
try {
console.printInfo("Killing DAG...");
dagClient.tryKillDAG();
@@ -376,10 +375,22 @@ public class TezJobMonitor {
}
}
}
+
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
return rc;
}
+ private static boolean hasInterruptedException(Throwable e) {
+ // Hadoop IPC wraps InterruptedException. GRRR.
+ while (e != null) {
+ if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+ return true;
+ }
+ e = e.getCause();
+ }
+ return false;
+ }
+
/**
* killRunningJobs tries to terminate execution of all
* currently running tez queries. No guarantees, best effort only.
http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index a6d911d..a2060da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -29,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.client.CallerContext;
@@ -64,10 +69,13 @@ import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
import org.json.JSONObject;
/**
@@ -86,6 +94,8 @@ public class TezTask extends Task<TezWork> {
private final DagUtils utils;
+ private DAGClient dagClient = null;
+
Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
@@ -107,7 +117,6 @@ public class TezTask extends Task<TezWork> {
int rc = 1;
boolean cleanContext = false;
Context ctx = null;
- DAGClient client = null;
TezSessionState session = null;
try {
@@ -177,12 +186,12 @@ public class TezTask extends Task<TezWork> {
addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources);
// submit will send the job to the cluster and start executing
- client = submit(jobConf, dag, scratchDir, appJarLr, session,
+ dagClient = submit(jobConf, dag, scratchDir, appJarLr, session,
additionalLr, inputOutputJars, inputOutputLocalResources);
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag);
+ rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag);
if (rc != 0) {
this.setException(new HiveException(monitor.getDiagnostics()));
}
@@ -190,7 +199,7 @@ public class TezTask extends Task<TezWork> {
// fetch the counters
try {
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+ counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
} catch (Exception err) {
// Don't fail execution due to counters - just don't print summary info
LOG.error("Failed to get counters: " + err, err);
@@ -231,7 +240,7 @@ public class TezTask extends Task<TezWork> {
}
}
// need to either move tmp files or remove them
- if (client != null) {
+ if (dagClient != null) {
// rc will only be overwritten if close errors out
rc = close(work, rc);
}
@@ -462,7 +471,7 @@ public class TezTask extends Task<TezWork> {
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
- return dagClient;
+ return new SyncDagClient(dagClient);
}
/*
@@ -544,4 +553,92 @@ public class TezTask extends Task<TezWork> {
return ((ReduceWork)children.get(0)).getReducer();
}
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (dagClient != null) {
+ LOG.info("Shutting down Tez task " + this);
+ try {
+ dagClient.tryKillDAG();
+ LOG.info("Waiting for Tez task to shut down: " + this);
+ dagClient.waitForCompletion();
+ } catch (Exception ex) {
+ LOG.info("Failed to shut down TezTask" + this, ex);
+ }
+ }
+ }
+
+ /** DAG client that does dumb global sync on all the method calls;
+ * Tez DAG client is not thread safe and getting the 2nd one is not recommended. */
+ public class SyncDagClient extends DAGClient {
+ private final DAGClient dagClient;
+
+ public SyncDagClient(DAGClient dagClient) {
+ super();
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+ dagClient.close(); // Don't sync.
+ }
+
+ @Override
+ public String getExecutionContext() {
+ return dagClient.getExecutionContext(); // Don't sync.
+ }
+
+ @Override
+ @Private
+ protected ApplicationReport getApplicationReportInternal() {
+ throw new UnsupportedOperationException(); // The method is not exposed, and we don't use it.
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getDAGStatus(statusOptions);
+ }
+ }
+
+ @Override
+ public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
+ long timeout) throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getDAGStatus(statusOptions, timeout);
+ }
+ }
+
+ @Override
+ public VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions) throws IOException, TezException {
+ synchronized (dagClient) {
+ return dagClient.getVertexStatus(vertexName, statusOptions);
+ }
+ }
+
+ @Override
+ public void tryKillDAG() throws IOException, TezException {
+ synchronized (dagClient) {
+ dagClient.tryKillDAG();
+ }
+ }
+
+ @Override
+ public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
+ synchronized (dagClient) {
+ return dagClient.waitForCompletion();
+ }
+ }
+
+ @Override
+ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts)
+ throws IOException, TezException, InterruptedException {
+ synchronized (dagClient) {
+ return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
+ }
+ }
+ }
}