You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/26 19:58:37 UTC
svn commit: r1654869 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl:
LocalSparkJobStatus.java RemoteSparkJobStatus.java
Author: xuefu
Date: Mon Jan 26 18:58:37 2015
New Revision: 1654869
URL: http://svn.apache.org/r1654869
Log:
HIVE-9428: LocalSparkJobStatus may return failed job as successful [Spark Branch] (Rui via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1654869&r1=1654868&r2=1654869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Mon Jan 26 18:58:37 2015
@@ -22,6 +22,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
@@ -43,6 +45,7 @@ import com.google.common.collect.Maps;
public class LocalSparkJobStatus implements SparkJobStatus {
private final JavaSparkContext sparkContext;
+ private static final Log LOG = LogFactory.getLog(LocalSparkJobStatus.class.getName());
private int jobId;
// After SPARK-2321, we only use JobMetricsListener to get job metrics
// TODO: remove it when the new API provides equivalent functionality
@@ -69,16 +72,20 @@ public class LocalSparkJobStatus impleme
@Override
public JobExecutionStatus getState() {
+ SparkJobInfo sparkJobInfo = getJobInfo();
// For spark job with empty source data, it's not submitted actually, so we would never
// receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
// job state.
- if (future.isDone()) {
+ if (sparkJobInfo == null && future.isDone()) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Failed to run job " + jobId, e);
+ return JobExecutionStatus.FAILED;
+ }
return JobExecutionStatus.SUCCEEDED;
- } else {
- // SparkJobInfo may not be available yet
- SparkJobInfo sparkJobInfo = getJobInfo();
- return sparkJobInfo == null ? null : sparkJobInfo.status();
}
+ return sparkJobInfo == null ? null : sparkJobInfo.status();
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1654869&r1=1654868&r2=1654869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Mon Jan 26 18:58:37 2015
@@ -179,7 +179,15 @@ public class RemoteSparkJobStatus implem
if (list != null && list.size() == 1) {
JavaFutureAction<?> futureAction = list.get(0);
if (futureAction.isDone()) {
- jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED);
+ boolean futureSucceed = true;
+ try {
+ futureAction.get();
+ } catch (Exception e) {
+ LOG.error("Failed to run job " + sparkJobId, e);
+ futureSucceed = false;
+ }
+ jobInfo = getDefaultJobInfo(sparkJobId,
+ futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED);
}
}
}