You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/04/13 17:31:01 UTC
hive git commit: Should kill running Spark Jobs when a query is
cancelled (Zhihai Xu, reviewed by Chao Sun)
Repository: hive
Updated Branches:
refs/heads/master a70042803 -> cbab5b29f
Should kill running Spark Jobs when a query is cancelled (Zhihai Xu, reviewed by Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbab5b29
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbab5b29
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbab5b29
Branch: refs/heads/master
Commit: cbab5b29f26ceb3d4633ade9647ce8bcb2f020a0
Parents: a700428
Author: Zhihai Xu <zh...@gmail.com>
Authored: Thu Apr 13 10:30:36 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Thu Apr 13 10:30:36 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/SparkTask.java | 21 +++++++++++++++++++-
1 file changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbab5b29/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 4c01329..32a7730 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -83,6 +83,8 @@ public class SparkTask extends Task<SparkWork> {
private transient int totalTaskCount;
private transient int failedTaskCount;
private transient List<Integer> stageIds;
+ private transient SparkJobRef jobRef = null;
+ private transient boolean isShutdown = false;
@Override
public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -107,7 +109,7 @@ public class SparkTask extends Task<SparkWork> {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
- SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
+ jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
addToHistory(jobRef);
@@ -290,6 +292,23 @@ public class SparkTask extends Task<SparkWork> {
return finishTime;
}
+ public boolean isTaskShutdown() {
+ return isShutdown;
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ if (jobRef != null && !isShutdown) {
+ try {
+ jobRef.cancelJob();
+ } catch (Exception e) {
+ LOG.warn("failed to kill job", e);
+ }
+ }
+ isShutdown = true;
+ }
+
/**
* Set the number of reducers for the spark work.
*/