You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/17 18:03:09 UTC
svn commit: r1640190 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./
status/ status/impl/
Author: brock
Date: Mon Nov 17 17:03:08 2014
New Revision: 1640190
URL: http://svn.apache.org/r1640190
Log:
HIVE-8852 - Update new spark progress API for local submitted job monitoring [Spark Branch] (Rui Li via Brock)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Mon Nov 17 17:03:08 2014
@@ -32,14 +32,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -52,7 +51,6 @@ import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ui.jobs.JobProgressListener;
import scala.Tuple2;
@@ -113,17 +111,13 @@ public class SparkClient implements Seri
private List<String> localFiles = new ArrayList<String>();
- private JobStateListener jobStateListener;
-
- private JobProgressListener jobProgressListener;
+ private JobMetricsListener jobMetricsListener;
private SparkClient(Configuration hiveConf) {
SparkConf sparkConf = initiateSparkConf(hiveConf);
sc = new JavaSparkContext(sparkConf);
- jobStateListener = new JobStateListener();
- jobProgressListener = new JobProgressListener(sparkConf);
- sc.sc().listenerBus().addListener(jobStateListener);
- sc.sc().listenerBus().addListener(jobProgressListener);
+ jobMetricsListener = new JobMetricsListener();
+ sc.sc().listenerBus().addListener(jobMetricsListener);
}
private SparkConf initiateSparkConf(Configuration hiveConf) {
@@ -217,10 +211,11 @@ public class SparkClient implements Seri
JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
// We use Spark RDD async action to submit job as it's the only way to get jobId now.
JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
- // As we always use foreach action to submit RDD graph, it would only trigger on job.
+ // As we always use foreach action to submit RDD graph, it would only trigger one job.
int jobId = future.jobIds().get(0);
SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
+ new SimpleSparkJobStatus(sc, jobId, jobMetricsListener,
+ sparkCounters, future);
return new SparkJobRef(jobId, sparkJobStatus);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Mon Nov 17 17:03:08 2014
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.spark.JobExecutionStatus;
/**
* SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
@@ -59,26 +60,18 @@ public class SparkJobMonitor {
boolean done = false;
int failedCounter = 0;
int rc = 0;
- SparkJobState lastState = null;
+ JobExecutionStatus lastState = null;
Map<String, SparkStageProgress> lastProgressMap = null;
long startTime = 0;
while (true) {
-
try {
- Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
- SparkJobState state = sparkJobStatus.getState();
-
- if (state != lastState || state == SparkJobState.RUNNING) {
+ JobExecutionStatus state = sparkJobStatus.getState();
+ if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) {
lastState = state;
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
switch (state) {
- case SUBMITTED:
- console.printInfo("Status: Submitted");
- break;
- case INITING:
- console.printInfo("Status: Initializing");
- break;
case RUNNING:
if (!running) {
// print job stages.
@@ -110,14 +103,7 @@ public class SparkJobMonitor {
running = false;
done = true;
break;
- case KILLED:
- console.printInfo("Status: Killed");
- running = false;
- done = true;
- rc = 1;
- break;
case FAILED:
- case ERROR:
console.printError("Status: Failed");
running = false;
done = true;
@@ -187,17 +173,17 @@ public class SparkJobMonitor {
String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
}
} else {
- double cost = progress.getCumulativeTime() / 1000.0;
/* stage is waiting for input/slots or complete */
if (failed > 0) {
/* tasks finished but some failed */
reportBuffer.append(
- String.format(
- "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost));
+ String.format(
+ "%s: %d(-%d)/%d Finished with failed tasks\t",
+ stageName, complete, failed, total));
} else {
if (complete == total) {
reportBuffer.append(
- String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost));
+ String.format("%s: %d/%d Finished\t", stageName, complete, total));
} else {
reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.spark.JobExecutionStatus;
import java.util.Map;
@@ -29,7 +30,7 @@ public interface SparkJobStatus {
public int getJobId();
- public SparkJobState getState();
+ public JobExecutionStatus getState();
public int[] getStageIds();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java Mon Nov 17 17:03:08 2014
@@ -23,23 +23,21 @@ public class SparkStageProgress {
private int succeededTaskCount;
private int runningTaskCount;
private int failedTaskCount;
- private int killedTaskCount;
- private long cumulativeTime;
+ // TODO: remove the following two metrics as they're not available in current spark API,
+ // we can add them back once spark provides it
+// private int killedTaskCount;
+// private long cumulativeTime;
public SparkStageProgress(
int totalTaskCount,
int succeededTaskCount,
int runningTaskCount,
- int failedTaskCount,
- int killedTaskCount,
- long cumulativeTime) {
+ int failedTaskCount) {
this.totalTaskCount = totalTaskCount;
this.succeededTaskCount = succeededTaskCount;
this.runningTaskCount = runningTaskCount;
this.failedTaskCount = failedTaskCount;
- this.killedTaskCount = killedTaskCount;
- this.cumulativeTime = cumulativeTime;
}
public int getTotalTaskCount() {
@@ -58,14 +56,6 @@ public class SparkStageProgress {
return failedTaskCount;
}
- public int getKilledTaskCount() {
- return killedTaskCount;
- }
-
- public long getCumulativeTime() {
- return cumulativeTime;
- }
-
@Override
public boolean equals(Object obj) {
if (obj instanceof SparkStageProgress) {
@@ -73,8 +63,7 @@ public class SparkStageProgress {
return getTotalTaskCount() == other.getTotalTaskCount()
&& getSucceededTaskCount() == other.getSucceededTaskCount()
&& getRunningTaskCount() == other.getRunningTaskCount()
- && getFailedTaskCount() == other.getFailedTaskCount()
- && getKilledTaskCount() == other.getKilledTaskCount();
+ && getFailedTaskCount() == other.getFailedTaskCount();
}
return false;
}
@@ -90,10 +79,6 @@ public class SparkStageProgress {
sb.append(getRunningTaskCount());
sb.append(" Failed: ");
sb.append(getFailedTaskCount());
- sb.append(" Killed: ");
- sb.append(getKilledTaskCount());
- sb.append(" CumulativeTime: ");
- sb.append(getCumulativeTime() + "ms");
return sb.toString();
}
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java?rev=1640190&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java Mon Nov 17 17:03:08 2014
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.JobSucceeded;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+public class JobMetricsListener implements SparkListener {
+
+ private final static Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+ private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+ private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+ private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+ }
+
+ @Override
+ public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ int stageId = taskEnd.stageId();
+ int stageAttemptId = taskEnd.stageAttemptId();
+ String stageIdentifier = stageId + "_" + stageAttemptId;
+ Integer jobId = stageIdToJobId.get(stageId);
+ if (jobId == null) {
+ LOG.warn("Can not find job id for stage[" + stageId + "].");
+ } else {
+ Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+ if (jobMetrics == null) {
+ jobMetrics = Maps.newHashMap();
+ allJobMetrics.put(jobId, jobMetrics);
+ }
+ List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+ if (stageMetrics == null) {
+ stageMetrics = Lists.newLinkedList();
+ jobMetrics.put(stageIdentifier, stageMetrics);
+ }
+ stageMetrics.add(taskEnd.taskMetrics());
+ }
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ int jobId = jobStart.jobId();
+ int size = jobStart.stageIds().size();
+ int[] intStageIds = new int[size];
+ for(int i=0; i< size; i++) {
+ Integer stageId = (Integer) jobStart.stageIds().apply(i);
+ intStageIds[i] = stageId;
+ stageIdToJobId.put(stageId, jobId);
+ }
+ jobIdToStageId.put(jobId, intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+ }
+
+ public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+ return allJobMetrics.get(jobId);
+ }
+
+ public synchronized void cleanup(int jobId) {
+ allJobMetrics.remove(jobId);
+ jobIdToStageId.remove(jobId);
+ Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Integer, Integer> entry = iterator.next();
+ if (entry.getValue() == jobId) {
+ iterator.remove();
+ }
+ }
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -26,42 +25,35 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.StageInfo;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.spark.ui.jobs.UIData;
import scala.Option;
-import scala.Tuple2;
-
-import static scala.collection.JavaConversions.bufferAsJavaList;
-import static scala.collection.JavaConversions.mutableMapAsJavaMap;
public class SimpleSparkJobStatus implements SparkJobStatus {
+ private final JavaSparkContext sparkContext;
private int jobId;
- private JobStateListener jobStateListener;
- private JobProgressListener jobProgressListener;
+ // After SPARK-2321, we only use JobMetricsListener to get job metrics
+ // TODO: remove it when the new API provides equivalent functionality
+ private JobMetricsListener jobMetricsListener;
private SparkCounters sparkCounters;
private JavaFutureAction<Void> future;
- public SimpleSparkJobStatus(
- int jobId,
- JobStateListener stateListener,
- JobProgressListener progressListener,
- SparkCounters sparkCounters,
- JavaFutureAction<Void> future) {
-
+ public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId,
+ JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
+ JavaFutureAction<Void> future) {
+ this.sparkContext = sparkContext;
this.jobId = jobId;
- this.jobStateListener = stateListener;
- this.jobProgressListener = progressListener;
+ this.jobMetricsListener = jobMetricsListener;
this.sparkCounters = sparkCounters;
this.future = future;
}
@@ -72,62 +64,39 @@ public class SimpleSparkJobStatus implem
}
@Override
- public SparkJobState getState() {
+ public JobExecutionStatus getState() {
// For spark job with empty source data, it's not submitted actually, so we would never
// receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
// job state.
if (future.isDone()) {
- return SparkJobState.SUCCEEDED;
+ return JobExecutionStatus.SUCCEEDED;
} else {
- return jobStateListener.getJobState(jobId);
+ // SparkJobInfo may not be available yet
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? null : sparkJobInfo.status();
}
}
@Override
public int[] getStageIds() {
- return jobStateListener.getStageIds(jobId);
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds();
}
@Override
public Map<String, SparkStageProgress> getSparkStageProgress() {
Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
- int[] stageIds = jobStateListener.getStageIds(jobId);
- if (stageIds != null) {
- for (int stageId : stageIds) {
- List<StageInfo> stageInfos = getStageInfo(stageId);
- for (StageInfo stageInfo : stageInfos) {
- Tuple2<Object, Object> tuple2 = new Tuple2<Object, Object>(stageInfo.stageId(),
- stageInfo.attemptId());
- UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get();
- if (uiData != null) {
- int runningTaskCount = uiData.numActiveTasks();
- int completedTaskCount = uiData.numCompleteTasks();
- int failedTaskCount = uiData.numFailedTasks();
- int totalTaskCount = stageInfo.numTasks();
- int killedTaskCount = 0;
- long costTime;
- Option<Object> startOption = stageInfo.submissionTime();
- Option<Object> completeOption = stageInfo.completionTime();
- if (startOption.isEmpty()) {
- costTime = 0;
- } else if (completeOption.isEmpty()) {
- long startTime = (Long)startOption.get();
- costTime = System.currentTimeMillis() - startTime;
- } else {
- long startTime = (Long)startOption.get();
- long completeTime = (Long)completeOption.get();
- costTime = completeTime - startTime;
- }
- SparkStageProgress stageProgress = new SparkStageProgress(
- totalTaskCount,
- completedTaskCount,
- runningTaskCount,
- failedTaskCount,
- killedTaskCount,
- costTime);
- stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress);
- }
- }
+ for (int stageId : getStageIds()) {
+ SparkStageInfo sparkStageInfo = getStageInfo(stageId);
+ if (sparkStageInfo != null) {
+ int runningTaskCount = sparkStageInfo.numActiveTasks();
+ int completedTaskCount = sparkStageInfo.numCompletedTasks();
+ int failedTaskCount = sparkStageInfo.numFailedTasks();
+ int totalTaskCount = sparkStageInfo.numTasks();
+ SparkStageProgress sparkStageProgress = new SparkStageProgress(
+ totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+ sparkStageInfo.currentAttemptId(), sparkStageProgress);
}
}
return stageProgresses;
@@ -145,7 +114,7 @@ public class SimpleSparkJobStatus implem
sparkStatisticsBuilder.add(sparkCounters);
// add spark job metrics.
String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
- Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+ Map<String, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
if (jobMetric == null) {
return null;
}
@@ -160,7 +129,7 @@ public class SimpleSparkJobStatus implem
@Override
public void cleanup() {
- jobStateListener.cleanup(jobId);
+ jobMetricsListener.cleanup(jobId);
}
private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
@@ -242,29 +211,11 @@ public class SimpleSparkJobStatus implem
return results;
}
- private List<StageInfo> getStageInfo(int stageId) {
- List<StageInfo> stageInfos = new LinkedList<StageInfo>();
-
- Map<Object, StageInfo> activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages());
- List<StageInfo> completedStages = bufferAsJavaList(jobProgressListener.completedStages());
- List<StageInfo> failedStages = bufferAsJavaList(jobProgressListener.failedStages());
-
- if (activeStages.containsKey(stageId)) {
- stageInfos.add(activeStages.get(stageId));
- } else {
- for (StageInfo stageInfo : completedStages) {
- if (stageInfo.stageId() == stageId) {
- stageInfos.add(stageInfo);
- }
- }
-
- for (StageInfo stageInfo : failedStages) {
- if (stageInfo.stageId() == stageId) {
- stageInfos.add(stageInfo);
- }
- }
- }
+ private SparkJobInfo getJobInfo() {
+ return sparkContext.statusTracker().getJobInfo(jobId);
+ }
- return stageInfos;
+ private SparkStageInfo getStageInfo(int stageId) {
+ return sparkContext.statusTracker().getStageInfo(stageId);
}
}