You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/05/09 16:40:27 UTC
hive git commit: HIVE-16456: Kill spark job when InterruptedException
happens or driverContext.isShutdown is true (Zhihai via Xuefu)
Repository: hive
Updated Branches:
refs/heads/master 067d953bf -> 4ba48aa5f
HIVE-16456: Kill spark job when InterruptedException happens or driverContext.isShutdown is true (Zhihai via Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ba48aa5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ba48aa5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ba48aa5
Branch: refs/heads/master
Commit: 4ba48aa5fcaa981ee469161bbf17611aa0392fd2
Parents: 067d953
Author: Xuefu Zhang <xu...@uber.com>
Authored: Tue May 9 09:40:13 2017 -0700
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Tue May 9 09:40:13 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/SparkTask.java | 32 +++++++++++++++++---
1 file changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4ba48aa5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 98b1605..b4fb49f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -85,6 +85,7 @@ public class SparkTask extends Task<SparkWork> {
private transient List<Integer> stageIds;
private transient SparkJobRef jobRef = null;
private transient boolean isShutdown = false;
+ private transient boolean jobKilled = false;
@Override
public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -112,6 +113,11 @@ public class SparkTask extends Task<SparkWork> {
jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+ if (driverContext.isShutdown()) {
+ killJob();
+ throw new HiveException("Operation is cancelled.");
+ }
+
addToHistory(jobRef);
sparkJobID = jobRef.getJobId();
this.jobID = jobRef.getSparkJobStatus().getAppID();
@@ -130,11 +136,11 @@ public class SparkTask extends Task<SparkWork> {
// ideally also cancel the app request here. But w/o facilities from Spark or YARN,
// it's difficult to do it on hive side alone. See HIVE-12650.
LOG.info("Failed to submit Spark job " + sparkJobID);
- jobRef.cancelJob();
+ killJob();
} else if (rc == 4) {
LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
- jobRef.cancelJob();
+ killJob();
}
if (this.jobID == null) {
@@ -305,14 +311,27 @@ public class SparkTask extends Task<SparkWork> {
@Override
public void shutdown() {
super.shutdown();
- if (jobRef != null && !isShutdown) {
+ killJob();
+ isShutdown = true;
+ }
+
+ private void killJob() {
+ boolean needToKillJob = false;
+ if (jobRef != null && !jobKilled) {
+ synchronized (this) {
+ if (!jobKilled) {
+ jobKilled = true;
+ needToKillJob = true;
+ }
+ }
+ }
+ if (needToKillJob) {
try {
jobRef.cancelJob();
} catch (Exception e) {
LOG.warn("failed to kill job", e);
}
}
- isShutdown = true;
}
/**
@@ -393,6 +412,11 @@ public class SparkTask extends Task<SparkWork> {
if (rc != 0) {
Throwable error = sparkJobStatus.getError();
if (error != null) {
+ if ((error instanceof InterruptedException) ||
+ (error instanceof HiveException &&
+ error.getCause() instanceof InterruptedException)) {
+ killJob();
+ }
setException(error);
}
}