You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/04/13 17:31:01 UTC

hive git commit: Should kill running Spark Jobs when a query is cancelled (Zhihai Xu, reviewed by Chao Sun)

Repository: hive
Updated Branches:
  refs/heads/master a70042803 -> cbab5b29f


Should kill running Spark Jobs when a query is cancelled (Zhihai Xu, reviewed by Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbab5b29
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbab5b29
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbab5b29

Branch: refs/heads/master
Commit: cbab5b29f26ceb3d4633ade9647ce8bcb2f020a0
Parents: a700428
Author: Zhihai Xu <zh...@gmail.com>
Authored: Thu Apr 13 10:30:36 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Thu Apr 13 10:30:36 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 21 +++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cbab5b29/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 4c01329..32a7730 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -83,6 +83,8 @@ public class SparkTask extends Task<SparkWork> {
   private transient int totalTaskCount;
   private transient int failedTaskCount;
   private transient List<Integer> stageIds;
+  private transient SparkJobRef jobRef = null;
+  private transient boolean isShutdown = false;
 
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -107,7 +109,7 @@ public class SparkTask extends Task<SparkWork> {
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
-      SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
+      jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
       addToHistory(jobRef);
@@ -290,6 +292,23 @@ public class SparkTask extends Task<SparkWork> {
     return finishTime;
   }
 
+  public boolean isTaskShutdown() {
+    return isShutdown;
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (jobRef != null && !isShutdown) {
+      try {
+        jobRef.cancelJob();
+      } catch (Exception e) {
+        LOG.warn("failed to kill job", e);
+      }
+    }
+    isShutdown = true;
+  }
+
   /**
    * Set the number of reducers for the spark work.
    */