You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/26 06:08:45 UTC

svn commit: r1641766 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Author: brock
Date: Wed Nov 26 05:08:45 2014
New Revision: 1641766

URL: http://svn.apache.org/r1641766
Log:
HIVE-8956 - Hive hangs while some error/exception happens beyond job execution [Spark Branch] (Rui li via Brock)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1641766&r1=1641765&r2=1641766&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Wed Nov 26 05:08:45 2014
@@ -45,12 +45,18 @@ import java.util.Map;
  */
 public class RemoteSparkJobStatus implements SparkJobStatus {
   private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
+  // time (in seconds) to wait for a spark job to be submitted on remote cluster
+  // after this period, we decide the job submission has failed so that client won't hang forever
+  private static final int WAIT_SUBMISSION_TIMEOUT = 30;
+  // remember when the monitor starts
+  private final long startTime;
   private final SparkClient sparkClient;
   private final JobHandle<Serializable> jobHandle;
 
   public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
     this.sparkClient = sparkClient;
     this.jobHandle = jobHandle;
+    startTime = System.currentTimeMillis();
   }
 
   @Override
@@ -108,7 +114,29 @@ public class RemoteSparkJobStatus implem
     Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
         jobHandle.getSparkJobIds().get(0) : null;
     if (sparkJobId == null) {
-      return null;
+      int duration = (int) ((System.currentTimeMillis() - startTime) / 1000);
+      if (duration <= WAIT_SUBMISSION_TIMEOUT) {
+        return null;
+      } else {
+        LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it.");
+        jobHandle.cancel(false);
+        return new SparkJobInfo() {
+          @Override
+          public int jobId() {
+            return -1;
+          }
+
+          @Override
+          public int[] stageIds() {
+            return new int[0];
+          }
+
+          @Override
+          public JobExecutionStatus status() {
+            return JobExecutionStatus.FAILED;
+          }
+        };
+      }
     }
     JobHandle<HiveSparkJobInfo> getJobInfo = sparkClient.submit(
         new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));