You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/13 07:10:20 UTC
hive git commit: HIVE-15386: Expose Spark task counts and stage Ids
information in SparkTask from SparkJobMonitor (Zhihai Xu,
reviewed by Xuefu and Rui, via Rui)
Repository: hive
Updated Branches:
refs/heads/master cccd38422 -> 217e641b3
HIVE-15386: Expose Spark task counts and stage Ids information in SparkTask from SparkJobMonitor (Zhihai Xu, reviewed by Xuefu and Rui, via Rui)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/217e641b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/217e641b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/217e641b
Branch: refs/heads/master
Commit: 217e641b3d7a73f5af2a4308613178a3b3732860
Parents: cccd384
Author: Zhihai Xu <zh...@gmail.com>
Authored: Tue Dec 13 15:10:08 2016 +0800
Committer: Rui Li <li...@apache.org>
Committed: Tue Dec 13 15:10:08 2016 +0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/SparkTask.java | 71 ++++++++++++++++++++
1 file changed, 71 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/217e641b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 87e96a9..f836065 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -74,6 +76,13 @@ public class SparkTask extends Task<SparkWork> {
private static final long serialVersionUID = 1L;
private transient String sparkJobID;
private transient SparkStatistics sparkStatistics;
+ private transient long submitTime;
+ private transient long startTime;
+ private transient long finishTime;
+ private transient int succeededTaskCount;
+ private transient int totalTaskCount;
+ private transient int failedTaskCount;
+ private transient List<Integer> stageIds;
@Override
public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext,
@@ -96,6 +105,7 @@ public class SparkTask extends Task<SparkWork> {
sparkWork.setRequiredCounterPrefix(getOperatorCounters());
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+ submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
@@ -104,6 +114,7 @@ public class SparkTask extends Task<SparkWork> {
this.jobID = jobRef.getSparkJobStatus().getAppID();
rc = jobRef.monitorJob();
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
+ getSparkJobInfo(sparkJobStatus);
if (rc == 0) {
sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
@@ -130,6 +141,8 @@ public class SparkTask extends Task<SparkWork> {
LOG.error(msg, e);
rc = 1;
} finally {
+ startTime = perfLogger.getEndTime(PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+ finishTime = perfLogger.getEndTime(PerfLogger.SPARK_RUN_JOB);
Utilities.clearWork(conf);
if (sparkSession != null && sparkSessionManager != null) {
rc = close(rc);
@@ -239,6 +252,34 @@ public class SparkTask extends Task<SparkWork> {
return sparkStatistics;
}
+ public int getSucceededTaskCount() {
+ return succeededTaskCount;
+ }
+
+ public int getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public int getFailedTaskCount() {
+ return failedTaskCount;
+ }
+
+ public List<Integer> getStageIds() {
+ return stageIds;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
/**
* Set the number of reducers for the spark work.
*/
@@ -288,4 +329,34 @@ public class SparkTask extends Task<SparkWork> {
return counters;
}
+
+ private void getSparkJobInfo(SparkJobStatus sparkJobStatus) {
+ try {
+ stageIds = new ArrayList<Integer>();
+ int[] ids = sparkJobStatus.getStageIds();
+ if (ids != null) {
+ for (int stageId : ids) {
+ stageIds.add(stageId);
+ }
+ }
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+ int sumTotal = 0;
+ int sumComplete = 0;
+ int sumFailed = 0;
+ for (String s : progressMap.keySet()) {
+ SparkStageProgress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int failed = progress.getFailedTaskCount();
+ sumTotal += total;
+ sumComplete += complete;
+ sumFailed += failed;
+ }
+ succeededTaskCount = sumComplete;
+ totalTaskCount = sumTotal;
+ failedTaskCount = sumFailed;
+ } catch (Exception e) {
+ LOG.error("Failed to get Spark job information", e);
+ }
+ }
}