You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/26 06:08:45 UTC
svn commit: r1641766 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Author: brock
Date: Wed Nov 26 05:08:45 2014
New Revision: 1641766
URL: http://svn.apache.org/r1641766
Log:
HIVE-8956 - Hive hangs while some error/exception happens beyond job execution [Spark Branch] (Rui li via Brock)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1641766&r1=1641765&r2=1641766&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Wed Nov 26 05:08:45 2014
@@ -45,12 +45,18 @@ import java.util.Map;
*/
public class RemoteSparkJobStatus implements SparkJobStatus {
private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
+ // time (in seconds) to wait for a spark job to be submitted on remote cluster
+ // after this period, we decide the job submission has failed so that client won't hang forever
+ private static final int WAIT_SUBMISSION_TIMEOUT = 30;
+ // remember when the monitor starts
+ private final long startTime;
private final SparkClient sparkClient;
private final JobHandle<Serializable> jobHandle;
public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
this.sparkClient = sparkClient;
this.jobHandle = jobHandle;
+ startTime = System.currentTimeMillis();
}
@Override
@@ -108,7 +114,29 @@ public class RemoteSparkJobStatus implem
Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
jobHandle.getSparkJobIds().get(0) : null;
if (sparkJobId == null) {
- return null;
+ int duration = (int) ((System.currentTimeMillis() - startTime) / 1000);
+ if (duration <= WAIT_SUBMISSION_TIMEOUT) {
+ return null;
+ } else {
+ LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it.");
+ jobHandle.cancel(false);
+ return new SparkJobInfo() {
+ @Override
+ public int jobId() {
+ return -1;
+ }
+
+ @Override
+ public int[] stageIds() {
+ return new int[0];
+ }
+
+ @Override
+ public JobExecutionStatus status() {
+ return JobExecutionStatus.FAILED;
+ }
+ };
+ }
}
JobHandle<HiveSparkJobInfo> getJobInfo = sparkClient.submit(
new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));