You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/02/03 18:36:57 UTC

hive git commit: HIVE-15772: set the exception into SparkJobStatus if exception happened in RemoteSparkJobMonitor and LocalSparkJobMonitor (Zhihai Xu, reviewed by Chao Sun)

Repository: hive
Updated Branches:
  refs/heads/master d7f71fb4a -> e45da2694


HIVE-15772: set the exception into SparkJobStatus if exception happened in RemoteSparkJobMonitor and LocalSparkJobMonitor (Zhihai Xu, reviewed by Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e45da269
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e45da269
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e45da269

Branch: refs/heads/master
Commit: e45da2694a7841427937392b46ac33a8f6ae3fcb
Parents: d7f71fb
Author: Zhihai Xu <zh...@gmail.com>
Authored: Fri Feb 3 10:36:15 2017 -0800
Committer: Chao Sun <su...@apache.org>
Committed: Fri Feb 3 10:36:27 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/exec/spark/status/LocalSparkJobMonitor.java   |  1 +
 .../hive/ql/exec/spark/status/RemoteSparkJobMonitor.java  |  1 +
 .../hadoop/hive/ql/exec/spark/status/SparkJobStatus.java  |  2 ++
 .../ql/exec/spark/status/impl/LocalSparkJobStatus.java    | 10 ++++++++++
 .../ql/exec/spark/status/impl/RemoteSparkJobStatus.java   | 10 ++++++++++
 5 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index b6d128b..a678228 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -128,6 +128,7 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
         console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
         rc = 1;
         done = true;
+        sparkJobStatus.setError(e);
       } finally {
         if (done) {
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 77038fc..ef3d8f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -158,6 +158,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
         console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
         rc = 1;
         done = true;
+        sparkJobStatus.setError(e);
       } finally {
         if (done) {
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index 72ce439..1ebb1ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -46,4 +46,6 @@ public interface SparkJobStatus {
   void cleanup();
 
   Throwable getError();
+
+  void setError(Throwable e);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index a94d4ed..ab8a9cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -49,6 +49,7 @@ public class LocalSparkJobStatus implements SparkJobStatus {
   private SparkCounters sparkCounters;
   private JavaFutureAction<Void> future;
   private Set<Integer> cachedRDDIds;
+  private Throwable error;
 
   public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId,
       JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
@@ -59,6 +60,7 @@ public class LocalSparkJobStatus implements SparkJobStatus {
     this.sparkCounters = sparkCounters;
     this.cachedRDDIds = cachedRDDIds;
     this.future = future;
+    this.error = null;
   }
 
   @Override
@@ -161,6 +163,9 @@ public class LocalSparkJobStatus implements SparkJobStatus {
 
   @Override
   public Throwable getError() {
+    if (error != null) {
+      return error;
+    }
     if (future.isDone()) {
       try {
         future.get();
@@ -171,6 +176,11 @@ public class LocalSparkJobStatus implements SparkJobStatus {
     return null;
   }
 
+  @Override
+  public void setError(Throwable e) {
+    this.error = e;
+  }
+
   private SparkJobInfo getJobInfo() {
     return sparkContext.statusTracker().getJobInfo(jobId);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e45da269/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index e87a21a..0e3e541 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -50,11 +50,13 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
   private static final Logger LOG = LoggerFactory.getLogger(RemoteSparkJobStatus.class.getName());
   private final SparkClient sparkClient;
   private final JobHandle<Serializable> jobHandle;
+  private Throwable error;
   private final transient long sparkClientTimeoutInSeconds;
 
   public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle, long timeoutInSeconds) {
     this.sparkClient = sparkClient;
     this.jobHandle = jobHandle;
+    this.error = null;
     this.sparkClientTimeoutInSeconds = timeoutInSeconds;
   }
 
@@ -138,9 +140,17 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
 
   @Override
   public Throwable getError() {
+    if (error != null) {
+      return error;
+    }
     return jobHandle.getError();
   }
 
+  @Override
+  public void setError(Throwable e) {
+    this.error = e;
+  }
+
   private SparkJobInfo getSparkJobInfo() throws HiveException {
     Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
       ? jobHandle.getSparkJobIds().get(0) : null;