You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/24 19:40:22 UTC
[02/11] hive git commit: HIVE-19733:
RemoteSparkJobStatus#getSparkStageProgress inefficient implementation
(Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)
HIVE-19733: RemoteSparkJobStatus#getSparkStageProgress inefficient implementation (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed4fa73b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed4fa73b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed4fa73b
Branch: refs/heads/master-txnstats
Commit: ed4fa73ba740026ac0d4297d6a45432dc60d1073
Parents: 4e9562f
Author: Bharathkrishna Guruvayoor Murali <bh...@cloudera.com>
Authored: Mon Jul 23 18:35:04 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 23 18:35:41 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +
.../spark/status/impl/RemoteSparkJobStatus.java | 108 +++++++++++++------
2 files changed, 78 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 06d0ed3..37bc153 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -599,6 +599,9 @@ public enum ErrorMsg {
SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"),
REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."),
+ SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true),
+ SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."),
+ SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true),
//========================== 40000 range starts here ========================//
http://git-wip-us.apache.org/repos/asf/hive/blob/ed4fa73b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 832832b..3d41443 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -45,6 +45,7 @@ import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -103,18 +104,20 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
@Override
public Map<SparkStage, SparkStageProgress> getSparkStageProgress() throws HiveException {
+ List<SparkStageInfo> sparkStagesInfo = getSparkStagesInfo();
Map<SparkStage, SparkStageProgress> stageProgresses = new HashMap<SparkStage, SparkStageProgress>();
- for (int stageId : getStageIds()) {
- SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
- if (sparkStageInfo != null && sparkStageInfo.name() != null) {
- int runningTaskCount = sparkStageInfo.numActiveTasks();
- int completedTaskCount = sparkStageInfo.numCompletedTasks();
- int failedTaskCount = sparkStageInfo.numFailedTasks();
- int totalTaskCount = sparkStageInfo.numTasks();
- SparkStageProgress sparkStageProgress = new SparkStageProgress(
- totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
- SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId());
- stageProgresses.put(stage, sparkStageProgress);
+ if (sparkStagesInfo != null) {
+ for (SparkStageInfo sparkStageInfo : sparkStagesInfo) {
+ if (sparkStageInfo != null && sparkStageInfo.name() != null) {
+ int runningTaskCount = sparkStageInfo.numActiveTasks();
+ int completedTaskCount = sparkStageInfo.numCompletedTasks();
+ int failedTaskCount = sparkStageInfo.numFailedTasks();
+ int totalTaskCount = sparkStageInfo.numTasks();
+ SparkStageProgress sparkStageProgress =
+ new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+ SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId());
+ stageProgresses.put(stage, sparkStageProgress);
+ }
}
}
return stageProgresses;
@@ -212,14 +215,26 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
}
}
- private SparkStageInfo getSparkStageInfo(int stageId) {
- Future<SparkStageInfo> getStageInfo = sparkClient.run(new GetStageInfoJob(stageId));
- try {
- return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
- } catch (Throwable t) {
- LOG.warn("Error getting stage info", t);
+ private List<SparkStageInfo> getSparkStagesInfo()throws HiveException {
+
+ Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
+ ? jobHandle.getSparkJobIds().get(0) : null;
+ if (sparkJobId == null) {
return null;
}
+ Future<ArrayList<SparkStageInfo>> getStagesInfo = sparkClient.run(
+ new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId));
+ try {
+ return getStagesInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_TIMEOUT,
+ Long.toString(sparkClientTimeoutInSeconds));
+ } catch (InterruptedException e) {
+ throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_INTERRUPTED);
+ } catch (ExecutionException e) {
+ throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_EXECUTIONERROR,
+ Throwables.getRootCause(e).getMessage());
+ }
}
public JobHandle.State getRemoteJobState() {
@@ -229,25 +244,24 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
return jobHandle.getState();
}
- private static class GetJobInfoJob implements Job<SparkJobInfo> {
+ private static class GetSparkStagesInfoJob implements Job<ArrayList<SparkStageInfo>> {
private final String clientJobId;
private final int sparkJobId;
- private GetJobInfoJob() {
+ private GetSparkStagesInfoJob() {
// For serialization.
this(null, -1);
}
- GetJobInfoJob(String clientJobId, int sparkJobId) {
+ GetSparkStagesInfoJob(String clientJobId, int sparkJobId) {
this.clientJobId = clientJobId;
this.sparkJobId = sparkJobId;
}
-
@Override
- public SparkJobInfo call(JobContext jc) throws Exception {
+ public ArrayList<SparkStageInfo> call(JobContext jc) throws Exception {
SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
if (jobInfo == null) {
- List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+ ArrayList<JavaFutureAction<?>> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId));
if (list != null && list.size() == 1) {
JavaFutureAction<?> futureAction = list.get(0);
if (futureAction.isDone()) {
@@ -266,25 +280,53 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
if (jobInfo == null) {
jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
}
- return jobInfo;
+ ArrayList<SparkStageInfo> sparkStageInfos = new ArrayList<>();
+ int[] stageIds = jobInfo.stageIds();
+ for(Integer stageid : stageIds) {
+ SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid);
+ sparkStageInfos.add(stageInfo);
+ }
+ return sparkStageInfos;
}
}
+ private static class GetJobInfoJob implements Job<SparkJobInfo> {
+ private final String clientJobId;
+ private final int sparkJobId;
- private static class GetStageInfoJob implements Job<SparkStageInfo> {
- private final int stageId;
-
- private GetStageInfoJob() {
+ private GetJobInfoJob() {
// For serialization.
- this(-1);
+ this(null, -1);
}
- GetStageInfoJob(int stageId) {
- this.stageId = stageId;
+ GetJobInfoJob(String clientJobId, int sparkJobId) {
+ this.clientJobId = clientJobId;
+ this.sparkJobId = sparkJobId;
}
@Override
- public SparkStageInfo call(JobContext jc) throws Exception {
- return jc.sc().statusTracker().getStageInfo(stageId);
+ public SparkJobInfo call(JobContext jc) throws Exception {
+ SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+ if (jobInfo == null) {
+ List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+ if (list != null && list.size() == 1) {
+ JavaFutureAction<?> futureAction = list.get(0);
+ if (futureAction.isDone()) {
+ boolean futureSucceed = true;
+ try {
+ futureAction.get();
+ } catch (Exception e) {
+ LOG.error("Failed to run job " + sparkJobId, e);
+ futureSucceed = false;
+ }
+ jobInfo = getDefaultJobInfo(sparkJobId,
+ futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED);
+ }
+ }
+ }
+ if (jobInfo == null) {
+ jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN);
+ }
+ return jobInfo;
}
}