You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/14 04:53:44 UTC

svn commit: r1631607 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./ status/ status/impl/

Author: xuefu
Date: Tue Oct 14 02:53:43 2014
New Revision: 1631607

URL: http://svn.apache.org/r1631607
Log:
HIVE-7439: Spark job monitoring and error reporting [Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1631607&r1=1631606&r2=1631607&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Tue Oct 14 02:53:43 2014
@@ -29,12 +29,21 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.FutureAction;
+import org.apache.spark.SimpleFutureAction;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ui.jobs.JobProgressListener;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -69,8 +78,17 @@ public class SparkClient implements Seri
 
   private List<String> localFiles = new ArrayList<String>();
 
+  private JobStateListener jobStateListener;
+
+  private JobProgressListener jobProgressListener;
+
   private SparkClient(Configuration hiveConf) {
-    sc = new JavaSparkContext(initiateSparkConf(hiveConf));
+    SparkConf sparkConf = initiateSparkConf(hiveConf);
+    sc = new JavaSparkContext(sparkConf);
+    jobStateListener = new JobStateListener();
+    jobProgressListener = new JobProgressListener(sparkConf);
+    sc.sc().listenerBus().addListener(jobStateListener);
+    sc.sc().listenerBus().addListener(jobProgressListener);
   }
 
   private SparkConf initiateSparkConf(Configuration hiveConf) {
@@ -161,7 +179,15 @@ public class SparkClient implements Seri
 
     // Execute generated plan.
     try {
-      plan.execute();
+      JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+      // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+      FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+      // An action may trigger multi jobs in Spark, we only monitor the latest job here
+      // until we found that Hive does trigger multi jobs.
+      SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus(
+        (Integer) future.jobIds().last(), jobStateListener, jobProgressListener);
+      SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+      monitor.startMonitor();
     } catch (Exception e) {
       LOG.error("Error executing Spark Plan", e);
       return 1;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1631607&r1=1631606&r2=1631607&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Tue Oct 14 02:53:43 2014
@@ -38,7 +38,7 @@ public class SparkPlan {
   private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
   private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
 
-  public void execute() throws IllegalStateException {
+  public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
     Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
         = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
     for (SparkTran tran : getAllTrans()) {
@@ -74,7 +74,7 @@ public class SparkPlan {
       }
     }
 
-    finalRDD.foreach(HiveVoidFunction.getInstance());
+    return finalRDD;
   }
 
   public void addTran(SparkTran tran) {

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class SparkJobMonitor {
+
+  private static final String CLASS_NAME = SparkJobMonitor.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+  private transient LogHelper console;
+  private final int checkInterval = 200;
+  private final int  maxRetryInterval = 2500;
+  private final int printInterval = 3000;
+  private long lastPrintTime;
+  private Set<String> completed;
+
+  private SparkJobStatus sparkJobStatus;
+
+  public SparkJobMonitor(SparkJobStatus sparkJobStatus) {
+    this.sparkJobStatus = sparkJobStatus;
+    console = new LogHelper(LOG);
+  }
+
+  public int startMonitor() {
+    completed = new HashSet<String>();
+
+    boolean running = false;
+    boolean done = false;
+    int failedCounter = 0;
+    int rc = 0;
+    SparkJobState lastState = null;
+    String lastReport = null;
+    long startTime = 0;
+
+    while(true) {
+
+      try {
+        Map<String, SparkProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+        SparkJobState state = sparkJobStatus.getState();
+
+        if (state != lastState || state == SparkJobState.RUNNING) {
+          lastState = state;
+
+          switch(state) {
+          case SUBMITTED:
+            console.printInfo("Status: Submitted");
+            break;
+          case INITING:
+            console.printInfo("Status: Initializing");
+            break;
+          case RUNNING:
+            if (!running) {
+              // print job stages.
+              console.printInfo("\nQuery Hive on Spark job[" +
+                sparkJobStatus.getJobId() + "] stages:");
+              for (int stageId : sparkJobStatus.getStageIds()) {
+                console.printInfo(Integer.toString(stageId));
+              }
+
+              console.printInfo("\nStatus: Running (Hive on Spark job[" +
+                sparkJobStatus.getJobId() + "])\n");
+              startTime = System.currentTimeMillis();
+              running = true;
+            }
+
+            lastReport = printStatus(progressMap, lastReport, console);
+            break;
+          case SUCCEEDED:
+            lastReport = printStatus(progressMap, lastReport, console);
+            double duration = (System.currentTimeMillis() - startTime)/1000.0;
+            console.printInfo("Status: Finished successfully in " +
+              String.format("%.2f seconds", duration));
+            running = false;
+            done = true;
+            break;
+          case KILLED:
+            console.printInfo("Status: Killed");
+            running = false;
+            done = true;
+            rc = 1;
+            break;
+          case FAILED:
+          case ERROR:
+            console.printError("Status: Failed");
+            running = false;
+            done = true;
+            rc = 2;
+            break;
+          }
+        }
+        if (!done) {
+          Thread.sleep(checkInterval);
+        }
+      } catch (Exception e) {
+        console.printInfo("Exception: "+e.getMessage());
+        if (++failedCounter % maxRetryInterval/checkInterval == 0
+          || e instanceof InterruptedException) {
+          console.printInfo("Killing Job...");
+          console.printError("Execution has failed.");
+          rc = 1;
+          done = true;
+        } else {
+          console.printInfo("Retrying...");
+        }
+      } finally {
+        if (done) {
+          break;
+        }
+      }
+    }
+    return rc;
+  }
+
+  private String printStatus(
+    Map<String, SparkProgress> progressMap,
+    String lastReport,
+    LogHelper console) {
+
+    StringBuffer reportBuffer = new StringBuffer();
+
+    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+    for (String s: keys) {
+      SparkProgress progress = progressMap.get(s);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      final int running = progress.getRunningTaskCount();
+      final int failed = progress.getFailedTaskCount();
+      if (total <= 0) {
+        reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
+      } else {
+        if (complete == total && !completed.contains(s)) {
+          completed.add(s);
+        }
+        if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
+          /* stage is started, but not complete */
+          if (failed > 0) {
+            reportBuffer.append(
+              String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+          } else {
+            reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+          }
+        } else {
+          /* stage is waiting for input/slots or complete */
+          if (failed > 0) {
+            /* tasks finished but some failed */
+            reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+          } else {
+            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+          }
+        }
+      }
+    }
+
+    String report = reportBuffer.toString();
+    if (!report.equals(lastReport)
+      || System.currentTimeMillis() >= lastPrintTime + printInterval) {
+      console.printInfo(report);
+      lastPrintTime = System.currentTimeMillis();
+    }
+
+    return report;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public enum SparkJobState {
+
+  SUBMITTED,
+  INITING,
+  RUNNING,
+  SUCCEEDED,
+  KILLED,
+  FAILED,
+  ERROR,
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+/**
+ * SparkJobStatus identify what Hive want to know about the status of a Spark job.
+ */
+public interface SparkJobStatus {
+
+  public int getJobId();
+
+  public SparkJobState getState();
+
+  public SparkProgress getSparkJobProgress();
+
+  public int[] getStageIds();
+
+  public Map<String, SparkProgress> getSparkStageProgress();
+
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public class SparkProgress {
+
+  private int totalTaskCount;
+  private int succeededTaskCount;
+  private int runningTaskCount;
+  private int failedTaskCount;
+  private int killedTaskCount;
+
+  public SparkProgress(
+    int totalTaskCount,
+    int succeededTaskCount,
+    int runningTaskCount,
+    int failedTaskCount,
+    int killedTaskCount) {
+
+    this.totalTaskCount = totalTaskCount;
+    this.succeededTaskCount = succeededTaskCount;
+    this.runningTaskCount = runningTaskCount;
+    this.failedTaskCount = failedTaskCount;
+    this.killedTaskCount = killedTaskCount;
+  }
+
+  public int getTotalTaskCount() {
+    return totalTaskCount;
+  }
+
+  public int getSucceededTaskCount() {
+    return succeededTaskCount;
+  }
+
+  public int getRunningTaskCount() {
+    return runningTaskCount;
+  }
+
+  public int getFailedTaskCount() {
+    return failedTaskCount;
+  }
+
+  public int getKilledTaskCount() {
+    return killedTaskCount;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SparkProgress) {
+      SparkProgress other = (SparkProgress) obj;
+      return getTotalTaskCount() == other.getTotalTaskCount()
+        && getSucceededTaskCount() == other.getSucceededTaskCount()
+        && getRunningTaskCount() == other.getRunningTaskCount()
+        && getFailedTaskCount() == other.getFailedTaskCount()
+        && getKilledTaskCount() == other.getKilledTaskCount();
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("TotalTasks: ");
+    sb.append(getTotalTaskCount());
+    sb.append(" Succeeded: ");
+    sb.append(getSucceededTaskCount());
+    sb.append(" Running: ");
+    sb.append(getRunningTaskCount());
+    sb.append(" Failed: ");
+    sb.append(getFailedTaskCount());
+    sb.append(" Killed: ");
+    sb.append(getKilledTaskCount());
+    return sb.toString();
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.scheduler.JobSucceeded;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+import scala.collection.JavaConversions;
+
+public class JobStateListener implements SparkListener {
+
+  private Map<Integer, SparkJobState> jobIdToStates = new HashMap<Integer, SparkJobState>();
+  private Map<Integer, int[]> jobIdToStageId = new HashMap<Integer, int[]>();
+
+  @Override
+  public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+  }
+
+  @Override
+  public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+  }
+
+  @Override
+  public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+  }
+
+  @Override
+  public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+  }
+
+  @Override
+  public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+
+  }
+
+  @Override
+  public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+    jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING);
+    List<Object> ids = JavaConversions.asJavaList(jobStart.stageIds());
+    int[] intStageIds = new int[ids.size()];
+    for(int i=0; i<ids.size(); i++) {
+      intStageIds[i] = (Integer)ids.get(i);
+    }
+    jobIdToStageId.put(jobStart.jobId(), intStageIds);
+  }
+
+  @Override
+  public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+    // JobSucceeded is a scala singleton object, so we need to add a dollar at the second part.
+    if (jobEnd.jobResult().getClass().getName().equals(JobSucceeded.class.getName() + "$")) {
+      jobIdToStates.put(jobEnd.jobId(), SparkJobState.SUCCEEDED);
+    } else {
+      jobIdToStates.put(jobEnd.jobId(), SparkJobState.FAILED);
+    }
+  }
+
+  @Override
+  public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+  }
+
+  @Override
+  public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+  }
+
+  @Override
+  public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+  }
+
+  @Override
+  public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+  }
+
+  @Override
+  public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+  }
+
+  @Override
+  public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+  }
+
+  @Override
+  public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+  }
+
+  public synchronized SparkJobState getJobState(int jobId) {
+    return jobIdToStates.get(jobId);
+  }
+
+  public synchronized int[] getStageIds(int jobId) {
+    return jobIdToStageId.get(jobId);
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkProgress;
+import org.apache.spark.scheduler.StageInfo;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.spark.ui.jobs.UIData;
+
+import scala.Tuple2;
+
+import static scala.collection.JavaConversions.bufferAsJavaList;
+import static scala.collection.JavaConversions.mutableMapAsJavaMap;
+
+public class SimpleSparkJobStatus implements SparkJobStatus {
+
+  private int jobId;
+  private JobStateListener jobStateListener;
+  private JobProgressListener jobProgressListener;
+
+  public SimpleSparkJobStatus(
+    int jobId,
+    JobStateListener stateListener,
+    JobProgressListener progressListener) {
+
+    this.jobId = jobId;
+    this.jobStateListener = stateListener;
+    this.jobProgressListener = progressListener;
+  }
+
+  @Override
+  public int getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public SparkJobState getState() {
+    return jobStateListener.getJobState(jobId);
+  }
+
+  @Override
+  public SparkProgress getSparkJobProgress() {
+    Map<String, SparkProgress> stageProgresses = getSparkStageProgress();
+
+    int totalTaskCount = 0;
+    int runningTaskCount = 0;
+    int completedTaskCount = 0;
+    int failedTaskCount = 0;
+    int killedTaskCount = 0;
+
+    for (SparkProgress sparkProgress : stageProgresses.values()) {
+      totalTaskCount += sparkProgress.getTotalTaskCount();
+      runningTaskCount += sparkProgress.getRunningTaskCount();
+      completedTaskCount += sparkProgress.getSucceededTaskCount();
+      failedTaskCount += sparkProgress.getFailedTaskCount();
+      killedTaskCount += sparkProgress.getKilledTaskCount();
+    }
+
+    return new SparkProgress(
+      totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, killedTaskCount);
+  }
+
+  @Override
+  public int[] getStageIds() {
+    return jobStateListener.getStageIds(jobId);
+  }
+
+  @Override
+  public Map<String, SparkProgress> getSparkStageProgress() {
+    Map<String, SparkProgress> stageProgresses = new HashMap<String, SparkProgress>();
+    int[] stageIds = jobStateListener.getStageIds(jobId);
+    if (stageIds != null) {
+      for (int stageId : stageIds) {
+        List<StageInfo> stageInfos = getStageInfo(stageId);
+        for (StageInfo stageInfo : stageInfos) {
+          Tuple2<Object, Object> tuple2 = new Tuple2<Object, Object>(stageInfo.stageId(),
+            stageInfo.attemptId());
+          UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get();
+          if (uiData != null) {
+            int runningTaskCount = uiData.numActiveTasks();
+            int completedTaskCount = uiData.numCompleteTasks();
+            int failedTaskCount = uiData.numFailedTasks();
+            int totalTaskCount = stageInfo.numTasks();
+            int killedTaskCount = 0;
+            SparkProgress stageProgress = new SparkProgress(
+              totalTaskCount,
+              completedTaskCount,
+              runningTaskCount,
+              failedTaskCount,
+              killedTaskCount);
+            stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress);
+          }
+        }
+      }
+    }
+    return stageProgresses;
+  }
+
+  private List<StageInfo> getStageInfo(int stageId) {
+    List<StageInfo> stageInfos = new LinkedList<StageInfo>();
+
+    Map<Object, StageInfo> activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages());
+    List<StageInfo> completedStages = bufferAsJavaList(jobProgressListener.completedStages());
+    List<StageInfo> failedStages = bufferAsJavaList(jobProgressListener.failedStages());
+
+    if (activeStages.containsKey(stageId)) {
+      stageInfos.add(activeStages.get(stageId));
+    } else {
+      for (StageInfo stageInfo : completedStages) {
+        if (stageInfo.stageId() == stageId) {
+          stageInfos.add(stageInfo);
+        }
+      }
+
+      for (StageInfo stageInfo : failedStages) {
+        if (stageInfo.stageId() == stageId) {
+          stageInfos.add(stageInfo);
+        }
+      }
+    }
+
+    return stageInfos;
+  }
+}