You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/17 21:53:58 UTC
svn commit: r1646335 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Author: xuefu
Date: Wed Dec 17 20:53:58 2014
New Revision: 1646335
URL: http://svn.apache.org/r1646335
Log:
HIVE-8972: Implement more fine-grained remote client-level events [Spark Branch] (Rui via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1646335&r1=1646334&r2=1646335&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Wed Dec 17 20:53:58 2014
@@ -42,15 +42,16 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* Used with remove spark client.
*/
public class RemoteSparkJobStatus implements SparkJobStatus {
private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
- // time (in seconds) to wait for a spark job to be submitted on remote cluster
+ // time (in milliseconds) to wait for a spark job to be submitted on remote cluster
// after this period, we decide the job submission has failed so that client won't hang forever
- private static final int WAIT_SUBMISSION_TIMEOUT = 30;
+ private static final long WAIT_SUBMISSION_TIMEOUT = 30000;
// remember when the monitor starts
private final long startTime;
private final SparkClient sparkClient;
@@ -59,7 +60,7 @@ public class RemoteSparkJobStatus implem
public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
this.sparkClient = sparkClient;
this.jobHandle = jobHandle;
- startTime = System.currentTimeMillis();
+ startTime = System.nanoTime();
}
@Override
@@ -70,7 +71,7 @@ public class RemoteSparkJobStatus implem
@Override
public JobExecutionStatus getState() {
SparkJobInfo sparkJobInfo = getSparkJobInfo();
- return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN;
+ return sparkJobInfo != null ? sparkJobInfo.status() : null;
}
@Override
@@ -132,28 +133,14 @@ public class RemoteSparkJobStatus implem
Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
jobHandle.getSparkJobIds().get(0) : null;
if (sparkJobId == null) {
- int duration = (int) ((System.currentTimeMillis() - startTime) / 1000);
+ long duration = TimeUnit.MILLISECONDS.convert(
+ System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (duration <= WAIT_SUBMISSION_TIMEOUT) {
return null;
} else {
- LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it.");
+ LOG.info("Job hasn't been submitted after " + duration / 1000 + "s. Aborting it.");
jobHandle.cancel(false);
- return new SparkJobInfo() {
- @Override
- public int jobId() {
- return -1;
- }
-
- @Override
- public int[] stageIds() {
- return new int[0];
- }
-
- @Override
- public JobExecutionStatus status() {
- return JobExecutionStatus.FAILED;
- }
- };
+ return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED);
}
}
JobHandle<SparkJobInfo> getJobInfo = sparkClient.submit(
@@ -198,42 +185,12 @@ public class RemoteSparkJobStatus implem
if (list != null && list.size() == 1) {
JavaFutureAction<?> futureAction = list.get(0);
if (futureAction.isDone()) {
- jobInfo = new SparkJobInfo() {
- @Override
- public int jobId() {
- return sparkJobId;
- }
-
- @Override
- public int[] stageIds() {
- return new int[0];
- }
-
- @Override
- public JobExecutionStatus status() {
- return JobExecutionStatus.SUCCEEDED;
- }
- };
+ jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED);
}
}
}
if (jobInfo == null) {
- jobInfo = new SparkJobInfo() {
- @Override
- public int jobId() {
- return -1;
- }
-
- @Override
- public int[] stageIds() {
- return new int[0];
- }
-
- @Override
- public JobExecutionStatus status() {
- return JobExecutionStatus.UNKNOWN;
- }
- };
+ jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
}
return jobInfo;
}
@@ -288,4 +245,25 @@ public class RemoteSparkJobStatus implem
return results;
}
+
+ private static SparkJobInfo getDefaultJobInfo(final Integer jobId,
+ final JobExecutionStatus status) {
+ return new SparkJobInfo() {
+
+ @Override
+ public int jobId() {
+ return jobId == null ? -1 : jobId;
+ }
+
+ @Override
+ public int[] stageIds() {
+ return new int[0];
+ }
+
+ @Override
+ public JobExecutionStatus status() {
+ return status;
+ }
+ };
+ }
}