You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/24 19:40:22 UTC

[02/11] hive git commit: HIVE-19733: RemoteSparkJobStatus#getSparkStageProgress inefficient implementation (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)

HIVE-19733: RemoteSparkJobStatus#getSparkStageProgress inefficient implementation (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed4fa73b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed4fa73b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed4fa73b

Branch: refs/heads/master-txnstats
Commit: ed4fa73ba740026ac0d4297d6a45432dc60d1073
Parents: 4e9562f
Author: Bharathkrishna Guruvayoor Murali <bh...@cloudera.com>
Authored: Mon Jul 23 18:35:04 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 23 18:35:41 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   3 +
 .../spark/status/impl/RemoteSparkJobStatus.java | 108 +++++++++++++------
 2 files changed, 78 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 06d0ed3..37bc153 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -599,6 +599,9 @@ public enum ErrorMsg {
   SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"),
 
   REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."),
+  SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true),
+  SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."),
+  SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true),
 
   //========================== 40000 range starts here ========================//
 

http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 832832b..3d41443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -45,6 +45,7 @@ import org.apache.spark.SparkStageInfo;
 import org.apache.spark.api.java.JavaFutureAction;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -103,18 +104,20 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
 
   @Override
   public Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws HiveException {
+    List<SparkStageInfo> sparkStagesInfo = getSparkStagesInfo();
     Map<SparkStage, SparkStageProgress> stageProgresses = new HashMap<SparkStage, SparkStageProgress>();
-    for (int stageId : getStageIds()) {
-      SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
-      if (sparkStageInfo != null && sparkStageInfo.name() != null) {
-        int runningTaskCount = sparkStageInfo.numActiveTasks();
-        int completedTaskCount = sparkStageInfo.numCompletedTasks();
-        int failedTaskCount = sparkStageInfo.numFailedTasks();
-        int totalTaskCount = sparkStageInfo.numTasks();
-        SparkStageProgress sparkStageProgress = new SparkStageProgress(
-            totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
-        SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId());
-        stageProgresses.put(stage, sparkStageProgress);
+    if (sparkStagesInfo != null) {
+      for (SparkStageInfo sparkStageInfo : sparkStagesInfo) {
+        if (sparkStageInfo != null && sparkStageInfo.name() != null) {
+          int runningTaskCount = sparkStageInfo.numActiveTasks();
+          int completedTaskCount = sparkStageInfo.numCompletedTasks();
+          int failedTaskCount = sparkStageInfo.numFailedTasks();
+          int totalTaskCount = sparkStageInfo.numTasks();
+          SparkStageProgress sparkStageProgress =
+              new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+          SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId());
+          stageProgresses.put(stage, sparkStageProgress);
+        }
       }
     }
     return stageProgresses;
@@ -212,14 +215,26 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
     }
   }
 
-  private SparkStageInfo getSparkStageInfo(int stageId) {
-    Future<SparkStageInfo> getStageInfo = sparkClient.run(new GetStageInfoJob(stageId));
-    try {
-      return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
-    } catch (Throwable t) {
-      LOG.warn("Error getting stage info", t);
+  private List<SparkStageInfo> getSparkStagesInfo()throws HiveException {
+
+    Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
+        ? jobHandle.getSparkJobIds().get(0) : null;
+    if (sparkJobId == null) {
       return null;
     }
+    Future<ArrayList<SparkStageInfo>> getStagesInfo = sparkClient.run(
+        new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId));
+    try {
+      return getStagesInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_TIMEOUT,
+          Long.toString(sparkClientTimeoutInSeconds));
+    } catch (InterruptedException e) {
+      throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_INTERRUPTED);
+    } catch (ExecutionException e) {
+      throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_EXECUTIONERROR,
+          Throwables.getRootCause(e).getMessage());
+    }
   }
 
   public JobHandle.State getRemoteJobState() {
@@ -229,25 +244,24 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
     return jobHandle.getState();
   }
 
-  private static class GetJobInfoJob implements Job<SparkJobInfo> {
+  private static class GetSparkStagesInfoJob implements Job<ArrayList<SparkStageInfo>> {
     private final String clientJobId;
     private final int sparkJobId;
 
-    private GetJobInfoJob() {
+    private GetSparkStagesInfoJob() {
       // For serialization.
       this(null, -1);
     }
 
-    GetJobInfoJob(String clientJobId, int sparkJobId) {
+    GetSparkStagesInfoJob(String clientJobId, int sparkJobId) {
       this.clientJobId = clientJobId;
       this.sparkJobId = sparkJobId;
     }
-
     @Override
-    public SparkJobInfo call(JobContext jc) throws Exception {
+    public ArrayList<SparkStageInfo> call(JobContext jc) throws Exception {
       SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
       if (jobInfo == null) {
-        List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+        ArrayList<JavaFutureAction<?>> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId));
         if (list != null && list.size() == 1) {
           JavaFutureAction<?> futureAction = list.get(0);
           if (futureAction.isDone()) {
@@ -266,25 +280,53 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
       if (jobInfo == null) {
         jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
       }
-      return jobInfo;
+      ArrayList<SparkStageInfo> sparkStageInfos = new ArrayList<>();
+      int[] stageIds = jobInfo.stageIds();
+      for(Integer stageid : stageIds) {
+        SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid);
+        sparkStageInfos.add(stageInfo);
+      }
+      return sparkStageInfos;
     }
   }
+  private static class GetJobInfoJob implements Job<SparkJobInfo> {
+    private final String clientJobId;
+    private final int sparkJobId;
 
-  private static class GetStageInfoJob implements Job<SparkStageInfo> {
-    private final int stageId;
-
-    private GetStageInfoJob() {
+    private GetJobInfoJob() {
       // For serialization.
-      this(-1);
+      this(null, -1);
     }
 
-    GetStageInfoJob(int stageId) {
-      this.stageId = stageId;
+    GetJobInfoJob(String clientJobId, int sparkJobId) {
+      this.clientJobId = clientJobId;
+      this.sparkJobId = sparkJobId;
     }
 
     @Override
-    public SparkStageInfo call(JobContext jc) throws Exception {
-      return jc.sc().statusTracker().getStageInfo(stageId);
+    public SparkJobInfo call(JobContext jc) throws Exception {
+      SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+      if (jobInfo == null) {
+        List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+        if (list != null && list.size() == 1) {
+          JavaFutureAction<?> futureAction = list.get(0);
+          if (futureAction.isDone()) {
+            boolean futureSucceed = true;
+            try {
+              futureAction.get();
+            } catch (Exception e) {
+              LOG.error("Failed to run job " + sparkJobId, e);
+              futureSucceed = false;
+            }
+            jobInfo = getDefaultJobInfo(sparkJobId,
+                futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED);
+          }
+        }
+      }
+      if (jobInfo == null) {
+        jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
+      }
+      return jobInfo;
     }
   }