You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/05/09 16:40:27 UTC

hive git commit: HIVE-16456: Kill spark job when InterruptedException happens or driverContext.isShutdown is true (Zhihai via Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master 067d953bf -> 4ba48aa5f


HIVE-16456: Kill spark job when InterruptedException happens or driverContext.isShutdown is true (Zhihai via Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ba48aa5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ba48aa5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ba48aa5

Branch: refs/heads/master
Commit: 4ba48aa5fcaa981ee469161bbf17611aa0392fd2
Parents: 067d953
Author: Xuefu Zhang <xu...@uber.com>
Authored: Tue May 9 09:40:13 2017 -0700
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Tue May 9 09:40:13 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 32 +++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4ba48aa5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 98b1605..b4fb49f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -85,6 +85,7 @@ public class SparkTask extends Task<SparkWork> {
   private transient List<Integer> stageIds;
   private transient SparkJobRef jobRef = null;
   private transient boolean isShutdown = false;
+  private transient boolean jobKilled = false;
 
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -112,6 +113,11 @@ public class SparkTask extends Task<SparkWork> {
       jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
+      if (driverContext.isShutdown()) {
+        killJob();
+        throw new HiveException("Operation is cancelled.");
+      }
+
       addToHistory(jobRef);
       sparkJobID = jobRef.getJobId();
       this.jobID = jobRef.getSparkJobStatus().getAppID();
@@ -130,11 +136,11 @@ public class SparkTask extends Task<SparkWork> {
         // ideally also cancel the app request here. But w/o facilities from Spark or YARN,
         // it's difficult to do it on hive side alone. See HIVE-12650.
         LOG.info("Failed to submit Spark job " + sparkJobID);
-        jobRef.cancelJob();
+        killJob();
       } else if (rc == 4) {
         LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
             ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
-        jobRef.cancelJob();
+        killJob();
       }
 
       if (this.jobID == null) {
@@ -305,14 +311,27 @@ public class SparkTask extends Task<SparkWork> {
   @Override
   public void shutdown() {
     super.shutdown();
-    if (jobRef != null && !isShutdown) {
+    killJob();
+    isShutdown = true;
+  }
+
+  private void killJob() {
+    boolean needToKillJob = false;
+    if (jobRef != null && !jobKilled) {
+      synchronized (this) {
+        if (!jobKilled) {
+          jobKilled = true;
+          needToKillJob = true;
+        }
+      }
+    }
+    if (needToKillJob) {
       try {
         jobRef.cancelJob();
       } catch (Exception e) {
         LOG.warn("failed to kill job", e);
       }
     }
-    isShutdown = true;
   }
 
   /**
@@ -393,6 +412,11 @@ public class SparkTask extends Task<SparkWork> {
       if (rc != 0) {
         Throwable error = sparkJobStatus.getError();
         if (error != null) {
+          if ((error instanceof InterruptedException) ||
+              (error instanceof HiveException &&
+              error.getCause() instanceof InterruptedException)) {
+            killJob();
+          }
           setException(error);
         }
       }