You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/13 07:10:20 UTC

hive git commit: HIVE-15386: Expose Spark task counts and stage Ids information in SparkTask from SparkJobMonitor (Zhihai Xu, reviewed by Xuefu and Rui, via Rui)

Repository: hive
Updated Branches:
  refs/heads/master cccd38422 -> 217e641b3


HIVE-15386: Expose Spark task counts and stage Ids information in SparkTask from SparkJobMonitor (Zhihai Xu, reviewed by Xuefu and Rui, via Rui)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/217e641b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/217e641b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/217e641b

Branch: refs/heads/master
Commit: 217e641b3d7a73f5af2a4308613178a3b3732860
Parents: cccd384
Author: Zhihai Xu <zh...@gmail.com>
Authored: Tue Dec 13 15:10:08 2016 +0800
Committer: Rui Li <li...@apache.org>
Committed: Tue Dec 13 15:10:08 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 71 ++++++++++++++++++++
 1 file changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/217e641b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 87e96a9..f836065 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -74,6 +76,13 @@ public class SparkTask extends Task<SparkWork> {
   private static final long serialVersionUID = 1L;
   private transient String sparkJobID;
   private transient SparkStatistics sparkStatistics;
+  private transient long submitTime;
+  private transient long startTime;
+  private transient long finishTime;
+  private transient int succeededTaskCount;
+  private transient int totalTaskCount;
+  private transient int failedTaskCount;
+  private transient List<Integer> stageIds;
 
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -96,6 +105,7 @@ public class SparkTask extends Task<SparkWork> {
       sparkWork.setRequiredCounterPrefix(getOperatorCounters());
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+      submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
@@ -104,6 +114,7 @@ public class SparkTask extends Task<SparkWork> {
       this.jobID = jobRef.getSparkJobStatus().getAppID();
       rc = jobRef.monitorJob();
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
+      getSparkJobInfo(sparkJobStatus);
       if (rc == 0) {
         sparkStatistics = sparkJobStatus.getSparkStatistics();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
@@ -130,6 +141,8 @@ public class SparkTask extends Task<SparkWork> {
       LOG.error(msg, e);
       rc = 1;
     } finally {
+      startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+      finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB);
       Utilities.clearWork(conf);
       if (sparkSession != null && sparkSessionManager != null) {
         rc = close(rc);
@@ -239,6 +252,34 @@ public class SparkTask extends Task<SparkWork> {
     return sparkStatistics;
   }
 
+  public int getSucceededTaskCount() {
+    return succeededTaskCount;
+  }
+
+  public int getTotalTaskCount() {
+    return totalTaskCount;
+  }
+
+  public int getFailedTaskCount() {
+    return failedTaskCount;
+  }
+
+  public List<Integer> getStageIds() {
+    return stageIds;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getSubmitTime() {
+    return submitTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
   /**
    * Set the number of reducers for the spark work.
    */
@@ -288,4 +329,34 @@ public class SparkTask extends Task<SparkWork> {
 
     return counters;
   }
+
+  private void getSparkJobInfo(SparkJobStatus sparkJobStatus) {
+    try {
+      stageIds = new ArrayList<Integer>();
+      int[] ids = sparkJobStatus.getStageIds();
+      if (ids != null) {
+        for (int stageId : ids) {
+          stageIds.add(stageId);
+        }
+      }
+      Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+      int sumTotal = 0;
+      int sumComplete = 0;
+      int sumFailed = 0;
+      for (String s : progressMap.keySet()) {
+        SparkStageProgress progress = progressMap.get(s);
+        final int complete = progress.getSucceededTaskCount();
+        final int total = progress.getTotalTaskCount();
+        final int failed = progress.getFailedTaskCount();
+        sumTotal += total;
+        sumComplete += complete;
+        sumFailed += failed;
+      }
+      succeededTaskCount = sumComplete;
+      totalTaskCount = sumTotal;
+      failedTaskCount = sumFailed;
+    } catch (Exception e) {
+      LOG.error("Failed to get Spark job information", e);
+    }
+  }
 }