You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/26 19:58:37 UTC

svn commit: r1654869 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl: LocalSparkJobStatus.java RemoteSparkJobStatus.java

Author: xuefu
Date: Mon Jan 26 18:58:37 2015
New Revision: 1654869

URL: http://svn.apache.org/r1654869
Log:
HIVE-9428: LocalSparkJobStatus may return failed job as successful [Spark Branch] (Rui via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1654869&r1=1654868&r2=1654869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Mon Jan 26 18:58:37 2015
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
@@ -43,6 +45,7 @@ import com.google.common.collect.Maps;
 public class LocalSparkJobStatus implements SparkJobStatus {
 
   private final JavaSparkContext sparkContext;
+  private static final Log LOG = LogFactory.getLog(LocalSparkJobStatus.class.getName());
   private int jobId;
   // After SPARK-2321, we only use JobMetricsListener to get job metrics
   // TODO: remove it when the new API provides equivalent functionality
@@ -69,16 +72,20 @@ public class LocalSparkJobStatus impleme
 
   @Override
   public JobExecutionStatus getState() {
+    SparkJobInfo sparkJobInfo = getJobInfo();
     // For spark job with empty source data, it's not submitted actually, so we would never
     // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
     // job state.
-    if (future.isDone()) {
+    if (sparkJobInfo == null && future.isDone()) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        LOG.error("Failed to run job " + jobId, e);
+        return JobExecutionStatus.FAILED;
+      }
       return JobExecutionStatus.SUCCEEDED;
-    } else {
-      // SparkJobInfo may not be available yet
-      SparkJobInfo sparkJobInfo = getJobInfo();
-      return sparkJobInfo == null ? null : sparkJobInfo.status();
     }
+    return sparkJobInfo == null ? null : sparkJobInfo.status();
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1654869&r1=1654868&r2=1654869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Mon Jan 26 18:58:37 2015
@@ -179,7 +179,15 @@ public class RemoteSparkJobStatus implem
         if (list != null && list.size() == 1) {
           JavaFutureAction<?> futureAction = list.get(0);
           if (futureAction.isDone()) {
-            jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED);
+            boolean futureSucceed = true;
+            try {
+              futureAction.get();
+            } catch (Exception e) {
+              LOG.error("Failed to run job " + sparkJobId, e);
+              futureSucceed = false;
+            }
+            jobInfo = getDefaultJobInfo(sparkJobId,
+                futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED);
           }
         }
       }