You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/17 18:03:09 UTC

svn commit: r1640190 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./ status/ status/impl/

Author: brock
Date: Mon Nov 17 17:03:08 2014
New Revision: 1640190

URL: http://svn.apache.org/r1640190
Log:
HIVE-8852 - Update new spark progress API for local submitted job monitoring [Spark Branch] (Rui Li via Brock)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Mon Nov 17 17:03:08 2014
@@ -32,14 +32,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
@@ -52,7 +51,6 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.ui.jobs.JobProgressListener;
 
 import scala.Tuple2;
 
@@ -113,17 +111,13 @@ public class SparkClient implements Seri
 
   private List<String> localFiles = new ArrayList<String>();
 
-  private JobStateListener jobStateListener;
-
-  private JobProgressListener jobProgressListener;
+  private JobMetricsListener jobMetricsListener;
 
   private SparkClient(Configuration hiveConf) {
     SparkConf sparkConf = initiateSparkConf(hiveConf);
     sc = new JavaSparkContext(sparkConf);
-    jobStateListener = new JobStateListener();
-    jobProgressListener = new JobProgressListener(sparkConf);
-    sc.sc().listenerBus().addListener(jobStateListener);
-    sc.sc().listenerBus().addListener(jobProgressListener);
+    jobMetricsListener = new JobMetricsListener();
+    sc.sc().listenerBus().addListener(jobMetricsListener);
   }
 
   private SparkConf initiateSparkConf(Configuration hiveConf) {
@@ -217,10 +211,11 @@ public class SparkClient implements Seri
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
     // We use Spark RDD async action to submit job as it's the only way to get jobId now.
     JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
-    // As we always use foreach action to submit RDD graph, it would only trigger on job.
+    // As we always use foreach action to submit RDD graph, it would only trigger one job.
     int jobId = future.jobIds().get(0);
     SimpleSparkJobStatus sparkJobStatus =
-      new SimpleSparkJobStatus(jobId, jobStateListener, jobProgressListener, sparkCounters, future);
+        new SimpleSparkJobStatus(sc, jobId, jobMetricsListener,
+            sparkCounters, future);
     return new SparkJobRef(jobId, sparkJobStatus);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Mon Nov 17 17:03:08 2014
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.spark.JobExecutionStatus;
 
 /**
  * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
@@ -59,26 +60,18 @@ public class SparkJobMonitor {
     boolean done = false;
     int failedCounter = 0;
     int rc = 0;
-    SparkJobState lastState = null;
+    JobExecutionStatus lastState = null;
     Map<String, SparkStageProgress> lastProgressMap = null;
     long startTime = 0;
 
     while (true) {
-
       try {
-        Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
-        SparkJobState state = sparkJobStatus.getState();
-
-        if (state != lastState || state == SparkJobState.RUNNING) {
+        JobExecutionStatus state = sparkJobStatus.getState();
+        if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) {
           lastState = state;
+          Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
 
           switch (state) {
-          case SUBMITTED:
-            console.printInfo("Status: Submitted");
-            break;
-          case INITING:
-            console.printInfo("Status: Initializing");
-            break;
           case RUNNING:
             if (!running) {
               // print job stages.
@@ -110,14 +103,7 @@ public class SparkJobMonitor {
             running = false;
             done = true;
             break;
-          case KILLED:
-            console.printInfo("Status: Killed");
-            running = false;
-            done = true;
-            rc = 1;
-            break;
           case FAILED:
-          case ERROR:
             console.printError("Status: Failed");
             running = false;
             done = true;
@@ -187,17 +173,17 @@ public class SparkJobMonitor {
               String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
           }
         } else {
-          double cost = progress.getCumulativeTime() / 1000.0;
           /* stage is waiting for input/slots or complete */
           if (failed > 0) {
             /* tasks finished but some failed */
             reportBuffer.append(
-              String.format(
-                "%s: %d(-%d)/%d Finished in %,.2fs\t", stageName, complete, failed, total, cost));
+                String.format(
+                    "%s: %d(-%d)/%d Finished with failed tasks\t",
+                    stageName, complete, failed, total));
           } else {
             if (complete == total) {
               reportBuffer.append(
-                String.format("%s: %d/%d Finished in %,.2fs\t", stageName, complete, total, cost));
+                  String.format("%s: %d/%d Finished\t", stageName, complete, total));
             } else {
               reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
             }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.spark.JobExecutionStatus;
 
 import java.util.Map;
 
@@ -29,7 +30,7 @@ public interface SparkJobStatus {
 
   public int getJobId();
 
-  public SparkJobState getState();
+  public JobExecutionStatus getState();
 
   public int[] getStageIds();
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java Mon Nov 17 17:03:08 2014
@@ -23,23 +23,21 @@ public class SparkStageProgress {
   private int succeededTaskCount;
   private int runningTaskCount;
   private int failedTaskCount;
-  private int killedTaskCount;
-  private long cumulativeTime;
+  // TODO: remove the following two metrics as they're not available in current spark API,
+  // we can add them back once spark provides it
+//  private int killedTaskCount;
+//  private long cumulativeTime;
 
   public SparkStageProgress(
     int totalTaskCount,
     int succeededTaskCount,
     int runningTaskCount,
-    int failedTaskCount,
-    int killedTaskCount,
-    long cumulativeTime) {
+    int failedTaskCount) {
 
     this.totalTaskCount = totalTaskCount;
     this.succeededTaskCount = succeededTaskCount;
     this.runningTaskCount = runningTaskCount;
     this.failedTaskCount = failedTaskCount;
-    this.killedTaskCount = killedTaskCount;
-    this.cumulativeTime = cumulativeTime;
   }
 
   public int getTotalTaskCount() {
@@ -58,14 +56,6 @@ public class SparkStageProgress {
     return failedTaskCount;
   }
 
-  public int getKilledTaskCount() {
-    return killedTaskCount;
-  }
-
-  public long getCumulativeTime() {
-    return cumulativeTime;
-  }
-
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof SparkStageProgress) {
@@ -73,8 +63,7 @@ public class SparkStageProgress {
       return getTotalTaskCount() == other.getTotalTaskCount()
         && getSucceededTaskCount() == other.getSucceededTaskCount()
         && getRunningTaskCount() == other.getRunningTaskCount()
-        && getFailedTaskCount() == other.getFailedTaskCount()
-        && getKilledTaskCount() == other.getKilledTaskCount();
+        && getFailedTaskCount() == other.getFailedTaskCount();
     }
     return false;
   }
@@ -90,10 +79,6 @@ public class SparkStageProgress {
     sb.append(getRunningTaskCount());
     sb.append(" Failed: ");
     sb.append(getFailedTaskCount());
-    sb.append(" Killed: ");
-    sb.append(getKilledTaskCount());
-    sb.append(" CumulativeTime: ");
-    sb.append(getCumulativeTime() + "ms");
     return sb.toString();
   }
 }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java?rev=1640190&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java Mon Nov 17 17:03:08 2014
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.scheduler.JobSucceeded;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+public class JobMetricsListener implements SparkListener {
+
+  private final static Log LOG = LogFactory.getLog(JobMetricsListener.class);
+
+  private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
+  private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
+  private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap();
+
+  @Override
+  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+  }
+
+  @Override
+  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+  }
+
+  @Override
+  public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+  }
+
+  @Override
+  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+  }
+
+  @Override
+  public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+    int stageId = taskEnd.stageId();
+    int stageAttemptId = taskEnd.stageAttemptId();
+    String stageIdentifier = stageId + "_" + stageAttemptId;
+    Integer jobId = stageIdToJobId.get(stageId);
+    if (jobId == null) {
+      LOG.warn("Can not find job id for stage[" + stageId + "].");
+    } else {
+      Map<String, List<TaskMetrics>> jobMetrics = allJobMetrics.get(jobId);
+      if (jobMetrics == null) {
+        jobMetrics = Maps.newHashMap();
+        allJobMetrics.put(jobId, jobMetrics);
+      }
+      List<TaskMetrics> stageMetrics = jobMetrics.get(stageIdentifier);
+      if (stageMetrics == null) {
+        stageMetrics = Lists.newLinkedList();
+        jobMetrics.put(stageIdentifier, stageMetrics);
+      }
+      stageMetrics.add(taskEnd.taskMetrics());
+    }
+  }
+
+  @Override
+  public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+    int jobId = jobStart.jobId();
+    int size = jobStart.stageIds().size();
+    int[] intStageIds = new int[size];
+    for(int i=0; i< size; i++) {
+      Integer stageId = (Integer) jobStart.stageIds().apply(i);
+      intStageIds[i] = stageId;
+      stageIdToJobId.put(stageId, jobId);
+    }
+    jobIdToStageId.put(jobId, intStageIds);
+  }
+
+  @Override
+  public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+
+  }
+
+  @Override
+  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+  }
+
+  @Override
+  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+  }
+
+  @Override
+  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+  }
+
+  @Override
+  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+  }
+
+  @Override
+  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+  }
+
+  @Override
+  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+  }
+
+  @Override
+  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+  }
+
+  public synchronized  Map<String, List<TaskMetrics>> getJobMetric(int jobId) {
+    return allJobMetrics.get(jobId);
+  }
+
+  public synchronized void cleanup(int jobId) {
+    allJobMetrics.remove(jobId);
+    jobIdToStageId.remove(jobId);
+    Iterator<Map.Entry<Integer, Integer>> iterator = stageIdToJobId.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, Integer> entry = iterator.next();
+      if (entry.getValue() == jobId) {
+        iterator.remove();
+      }
+    }
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1640190&r1=1640189&r2=1640190&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Mon Nov 17 17:03:08 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.spark.status.impl;
 
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -26,42 +25,35 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
 import org.apache.spark.api.java.JavaFutureAction;
-import org.apache.spark.executor.InputMetrics;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.StageInfo;
-import org.apache.spark.ui.jobs.JobProgressListener;
-import org.apache.spark.ui.jobs.UIData;
 
 import scala.Option;
-import scala.Tuple2;
-
-import static scala.collection.JavaConversions.bufferAsJavaList;
-import static scala.collection.JavaConversions.mutableMapAsJavaMap;
 
 public class SimpleSparkJobStatus implements SparkJobStatus {
 
+  private final JavaSparkContext sparkContext;
   private int jobId;
-  private JobStateListener jobStateListener;
-  private JobProgressListener jobProgressListener;
+  // After SPARK-2321, we only use JobMetricsListener to get job metrics
+  // TODO: remove it when the new API provides equivalent functionality
+  private JobMetricsListener jobMetricsListener;
   private SparkCounters sparkCounters;
   private JavaFutureAction<Void> future;
 
-  public SimpleSparkJobStatus(
-    int jobId,
-    JobStateListener stateListener,
-    JobProgressListener progressListener,
-    SparkCounters sparkCounters,
-    JavaFutureAction<Void> future) {
-
+  public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId,
+      JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
+      JavaFutureAction<Void> future) {
+    this.sparkContext = sparkContext;
     this.jobId = jobId;
-    this.jobStateListener = stateListener;
-    this.jobProgressListener = progressListener;
+    this.jobMetricsListener = jobMetricsListener;
     this.sparkCounters = sparkCounters;
     this.future = future;
   }
@@ -72,62 +64,39 @@ public class SimpleSparkJobStatus implem
   }
 
   @Override
-  public SparkJobState getState() {
+  public JobExecutionStatus getState() {
     // For spark job with empty source data, it's not submitted actually, so we would never
     // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
     // job state.
     if (future.isDone()) {
-      return SparkJobState.SUCCEEDED;
+      return JobExecutionStatus.SUCCEEDED;
     } else {
-      return jobStateListener.getJobState(jobId);
+      // SparkJobInfo may not be available yet
+      SparkJobInfo sparkJobInfo = getJobInfo();
+      return sparkJobInfo == null ? null : sparkJobInfo.status();
     }
   }
 
   @Override
   public int[] getStageIds() {
-    return jobStateListener.getStageIds(jobId);
+    SparkJobInfo sparkJobInfo = getJobInfo();
+    return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds();
   }
 
   @Override
   public Map<String, SparkStageProgress> getSparkStageProgress() {
     Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
-    int[] stageIds = jobStateListener.getStageIds(jobId);
-    if (stageIds != null) {
-      for (int stageId : stageIds) {
-        List<StageInfo> stageInfos = getStageInfo(stageId);
-        for (StageInfo stageInfo : stageInfos) {
-          Tuple2<Object, Object> tuple2 = new Tuple2<Object, Object>(stageInfo.stageId(),
-            stageInfo.attemptId());
-          UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get();
-          if (uiData != null) {
-            int runningTaskCount = uiData.numActiveTasks();
-            int completedTaskCount = uiData.numCompleteTasks();
-            int failedTaskCount = uiData.numFailedTasks();
-            int totalTaskCount = stageInfo.numTasks();
-            int killedTaskCount = 0;
-            long costTime;
-            Option<Object> startOption = stageInfo.submissionTime();
-            Option<Object> completeOption = stageInfo.completionTime();
-            if (startOption.isEmpty()) {
-              costTime = 0;
-            } else if (completeOption.isEmpty()) {
-              long startTime = (Long)startOption.get();
-              costTime = System.currentTimeMillis() - startTime;
-            } else {
-              long startTime = (Long)startOption.get();
-              long completeTime = (Long)completeOption.get();
-              costTime = completeTime - startTime;
-            }
-            SparkStageProgress stageProgress = new SparkStageProgress(
-              totalTaskCount,
-              completedTaskCount,
-              runningTaskCount,
-              failedTaskCount,
-              killedTaskCount,
-              costTime);
-            stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress);
-          }
-        }
+    for (int stageId : getStageIds()) {
+      SparkStageInfo sparkStageInfo = getStageInfo(stageId);
+      if (sparkStageInfo != null) {
+        int runningTaskCount = sparkStageInfo.numActiveTasks();
+        int completedTaskCount = sparkStageInfo.numCompletedTasks();
+        int failedTaskCount = sparkStageInfo.numFailedTasks();
+        int totalTaskCount = sparkStageInfo.numTasks();
+        SparkStageProgress sparkStageProgress = new SparkStageProgress(
+            totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+            sparkStageInfo.currentAttemptId(), sparkStageProgress);
       }
     }
     return stageProgresses;
@@ -145,7 +114,7 @@ public class SimpleSparkJobStatus implem
     sparkStatisticsBuilder.add(sparkCounters);
     // add spark job metrics.
     String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
-    Map<String, List<TaskMetrics>> jobMetric = jobStateListener.getJobMetric(jobId);
+    Map<String, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
     if (jobMetric == null) {
       return null;
     }
@@ -160,7 +129,7 @@ public class SimpleSparkJobStatus implem
 
   @Override
   public void cleanup() {
-    jobStateListener.cleanup(jobId);
+    jobMetricsListener.cleanup(jobId);
   }
 
   private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
@@ -242,29 +211,11 @@ public class SimpleSparkJobStatus implem
     return results;
   }
 
-  private List<StageInfo> getStageInfo(int stageId) {
-    List<StageInfo> stageInfos = new LinkedList<StageInfo>();
-
-    Map<Object, StageInfo> activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages());
-    List<StageInfo> completedStages = bufferAsJavaList(jobProgressListener.completedStages());
-    List<StageInfo> failedStages = bufferAsJavaList(jobProgressListener.failedStages());
-
-    if (activeStages.containsKey(stageId)) {
-      stageInfos.add(activeStages.get(stageId));
-    } else {
-      for (StageInfo stageInfo : completedStages) {
-        if (stageInfo.stageId() == stageId) {
-          stageInfos.add(stageInfo);
-        }
-      }
-
-      for (StageInfo stageInfo : failedStages) {
-        if (stageInfo.stageId() == stageId) {
-          stageInfos.add(stageInfo);
-        }
-      }
-    }
+  private SparkJobInfo getJobInfo() {
+    return sparkContext.statusTracker().getJobInfo(jobId);
+  }
 
-    return stageInfos;
+  private SparkStageInfo getStageInfo(int stageId) {
+    return sparkContext.statusTracker().getStageInfo(stageId);
   }
 }