You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/25 05:18:26 UTC

svn commit: r1641525 - in /hive/branches/spark: ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/ spark-client/src/main/java/org/apac...

Author: xuefu
Date: Tue Nov 25 04:18:26 2014
New Revision: 1641525

URL: http://svn.apache.org/r1641525
Log:
HIVE-8834: enable job progress monitoring of Remote Spark Context [Spark Branch] (Rui via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 25 04:18:26 2014
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -142,10 +142,10 @@ public class LocalHiveSparkClient implem
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
     // We use Spark RDD async action to submit job as it's the only way to get jobId now.
     JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
-    // As we always use foreach action to submit RDD graph, it would only trigger on job.
+    // As we always use foreach action to submit RDD graph, it would only trigger one job.
     int jobId = future.jobIds().get(0);
-    SimpleSparkJobStatus sparkJobStatus =
-      new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
+    LocalSparkJobStatus sparkJobStatus =
+      new LocalSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
     return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 25 04:18:26 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -122,8 +123,7 @@ public class RemoteHiveSparkClient imple
         return null;
       }
     });
-    jobHandle.get();
-    return new SparkJobRef(jobHandle.getClientJobId());
+    return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle));
   }
 
   private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Tue Nov 25 04:18:26 2014
@@ -62,12 +62,13 @@ public class SparkJobMonitor {
     int rc = 0;
     JobExecutionStatus lastState = null;
     Map<String, SparkStageProgress> lastProgressMap = null;
-    long startTime = 0;
+    long startTime = -1;
 
     while (true) {
       try {
         JobExecutionStatus state = sparkJobStatus.getState();
-        if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) {
+        if (state != null && state != JobExecutionStatus.UNKNOWN &&
+            (state != lastState || state == JobExecutionStatus.RUNNING)) {
           lastState = state;
           Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
 
@@ -97,9 +98,13 @@ public class SparkJobMonitor {
           case SUCCEEDED:
             printStatus(progressMap, lastProgressMap);
             lastProgressMap = progressMap;
-            double duration = (System.currentTimeMillis() - startTime) / 1000.0;
-            console.printInfo("Status: Finished successfully in " +
-              String.format("%.2f seconds", duration));
+            if (startTime < 0) {
+              console.printInfo("Status: Finished successfully within a check interval.");
+            } else {
+              double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+              console.printInfo("Status: Finished successfully in " +
+                  String.format("%.2f seconds", duration));
+            }
             running = false;
             done = true;
             break;

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+
+import scala.Option;
+
+public class LocalSparkJobStatus implements SparkJobStatus {
+
+  private final JavaSparkContext sparkContext;
+  private int jobId;
+  // After SPARK-2321, we only use JobMetricsListener to get job metrics
+  // TODO: remove it when the new API provides equivalent functionality
+  private JobMetricsListener jobMetricsListener;
+  private SparkCounters sparkCounters;
+  private JavaFutureAction<Void> future;
+
+  public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId,
+      JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
+      JavaFutureAction<Void> future) {
+    this.sparkContext = sparkContext;
+    this.jobId = jobId;
+    this.jobMetricsListener = jobMetricsListener;
+    this.sparkCounters = sparkCounters;
+    this.future = future;
+  }
+
+  @Override
+  public int getJobId() {
+    return jobId;
+  }
+
+  @Override
+  public JobExecutionStatus getState() {
+    // For spark job with empty source data, it's not submitted actually, so we would never
+    // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
+    // job state.
+    if (future.isDone()) {
+      return JobExecutionStatus.SUCCEEDED;
+    } else {
+      // SparkJobInfo may not be available yet
+      SparkJobInfo sparkJobInfo = getJobInfo();
+      return sparkJobInfo == null ? null : sparkJobInfo.status();
+    }
+  }
+
+  @Override
+  public int[] getStageIds() {
+    SparkJobInfo sparkJobInfo = getJobInfo();
+    return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds();
+  }
+
+  @Override
+  public Map<String, SparkStageProgress> getSparkStageProgress() {
+    Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
+    for (int stageId : getStageIds()) {
+      SparkStageInfo sparkStageInfo = getStageInfo(stageId);
+      if (sparkStageInfo != null) {
+        int runningTaskCount = sparkStageInfo.numActiveTasks();
+        int completedTaskCount = sparkStageInfo.numCompletedTasks();
+        int failedTaskCount = sparkStageInfo.numFailedTasks();
+        int totalTaskCount = sparkStageInfo.numTasks();
+        SparkStageProgress sparkStageProgress = new SparkStageProgress(
+            totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+            sparkStageInfo.currentAttemptId(), sparkStageProgress);
+      }
+    }
+    return stageProgresses;
+  }
+
+  @Override
+  public SparkCounters getCounter() {
+    return sparkCounters;
+  }
+
+  @Override
+  public SparkStatistics getSparkStatistics() {
+    SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
+    // add Hive operator level statistics.
+    sparkStatisticsBuilder.add(sparkCounters);
+    // add spark job metrics.
+    String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
+    Map<String, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
+    if (jobMetric == null) {
+      return null;
+    }
+
+    Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
+    for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
+      sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+    }
+
+    return  sparkStatisticsBuilder.build();
+  }
+
+  @Override
+  public void cleanup() {
+    jobMetricsListener.cleanup(jobId);
+  }
+
+  private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+    Map<String, Long> results = Maps.newLinkedHashMap();
+
+    long executorDeserializeTime = 0;
+    long executorRunTime = 0;
+    long resultSize = 0;
+    long jvmGCTime = 0;
+    long resultSerializationTime = 0;
+    long memoryBytesSpilled = 0;
+    long diskBytesSpilled = 0;
+    long bytesRead = 0;
+    long remoteBlocksFetched = 0;
+    long localBlocksFetched = 0;
+    long fetchWaitTime = 0;
+    long remoteBytesRead = 0;
+    long shuffleBytesWritten = 0;
+    long shuffleWriteTime = 0;
+    boolean inputMetricExist = false;
+    boolean shuffleReadMetricExist = false;
+    boolean shuffleWriteMetricExist = false;
+
+    for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+      if (stageMetric != null) {
+        for (TaskMetrics taskMetrics : stageMetric) {
+          if (taskMetrics != null) {
+            executorDeserializeTime += taskMetrics.executorDeserializeTime();
+            executorRunTime += taskMetrics.executorRunTime();
+            resultSize += taskMetrics.resultSize();
+            jvmGCTime += taskMetrics.jvmGCTime();
+            resultSerializationTime += taskMetrics.resultSerializationTime();
+            memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+            diskBytesSpilled += taskMetrics.diskBytesSpilled();
+            if (!taskMetrics.inputMetrics().isEmpty()) {
+              inputMetricExist = true;
+              bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+            }
+            Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+            if (!shuffleReadMetricsOption.isEmpty()) {
+              shuffleReadMetricExist = true;
+              remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+              localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+              fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+              remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+            }
+            Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+            if (!shuffleWriteMetricsOption.isEmpty()) {
+              shuffleWriteMetricExist = true;
+              shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+              shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+            }
+          }
+        }
+      }
+    }
+
+    results.put("EexcutorDeserializeTime", executorDeserializeTime);
+    results.put("ExecutorRunTime", executorRunTime);
+    results.put("ResultSize", resultSize);
+    results.put("JvmGCTime", jvmGCTime);
+    results.put("ResultSerializationTime", resultSerializationTime);
+    results.put("MemoryBytesSpilled", memoryBytesSpilled);
+    results.put("DiskBytesSpilled", diskBytesSpilled);
+    if (inputMetricExist) {
+      results.put("BytesRead", bytesRead);
+    }
+    if (shuffleReadMetricExist) {
+      results.put("RemoteBlocksFetched", remoteBlocksFetched);
+      results.put("LocalBlocksFetched", localBlocksFetched);
+      results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+      results.put("FetchWaitTime", fetchWaitTime);
+      results.put("RemoteBytesRead", remoteBytesRead);
+    }
+    if (shuffleWriteMetricExist) {
+      results.put("ShuffleBytesWritten", shuffleBytesWritten);
+      results.put("ShuffleWriteTime", shuffleWriteTime);
+    }
+    return results;
+  }
+
+  private SparkJobInfo getJobInfo() {
+    return sparkContext.statusTracker().getJobInfo(jobId);
+  }
+
+  private SparkStageInfo getStageInfo(int stageId) {
+    return sparkContext.statusTracker().getStageInfo(stageId);
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.hive.spark.client.Job;
+import org.apache.hive.spark.client.JobContext;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.client.status.HiveSparkJobInfo;
+import org.apache.hive.spark.client.status.HiveSparkStageInfo;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used with remove spark client.
+ */
+public class RemoteSparkJobStatus implements SparkJobStatus {
+  private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
+  private final SparkClient sparkClient;
+  private final JobHandle<Serializable> jobHandle;
+
+  public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
+    this.sparkClient = sparkClient;
+    this.jobHandle = jobHandle;
+  }
+
+  @Override
+  public int getJobId() {
+    return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1;
+  }
+
+  @Override
+  public JobExecutionStatus getState() {
+    SparkJobInfo sparkJobInfo = getSparkJobInfo();
+    return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN;
+  }
+
+  @Override
+  public int[] getStageIds() {
+    SparkJobInfo sparkJobInfo = getSparkJobInfo();
+    return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[0];
+  }
+
+  @Override
+  public Map<String, SparkStageProgress> getSparkStageProgress() {
+    Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
+    for (int stageId : getStageIds()) {
+      SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
+      if (sparkStageInfo != null && sparkStageInfo.name() != null) {
+        int runningTaskCount = sparkStageInfo.numActiveTasks();
+        int completedTaskCount = sparkStageInfo.numCompletedTasks();
+        int failedTaskCount = sparkStageInfo.numFailedTasks();
+        int totalTaskCount = sparkStageInfo.numTasks();
+        SparkStageProgress sparkStageProgress = new SparkStageProgress(
+            totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+            sparkStageInfo.currentAttemptId(), sparkStageProgress);
+      }
+    }
+    return stageProgresses;
+  }
+
+  @Override
+  public SparkCounters getCounter() {
+    return null;
+  }
+
+  @Override
+  public SparkStatistics getSparkStatistics() {
+    return null;
+  }
+
+  @Override
+  public void cleanup() {
+
+  }
+
+  private SparkJobInfo getSparkJobInfo() {
+    Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
+        jobHandle.getSparkJobIds().get(0) : null;
+    if (sparkJobId == null) {
+      return null;
+    }
+    JobHandle<HiveSparkJobInfo> getJobInfo = sparkClient.submit(
+        new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
+    try {
+      return getJobInfo.get();
+    } catch (Throwable t) {
+      LOG.warn("Error getting job info", t);
+      return null;
+    }
+  }
+
+  private SparkStageInfo getSparkStageInfo(int stageId) {
+    JobHandle<HiveSparkStageInfo> getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
+    try {
+      return getStageInfo.get();
+    } catch (Throwable t) {
+      LOG.warn("Error getting stage info", t);
+      return null;
+    }
+  }
+
+  private static class GetJobInfoJob implements Job<HiveSparkJobInfo> {
+    private final String clientJobId;
+    private final int sparkJobId;
+
+    GetJobInfoJob(String clientJobId, int sparkJobId) {
+      this.clientJobId = clientJobId;
+      this.sparkJobId = sparkJobId;
+    }
+
+    @Override
+    public HiveSparkJobInfo call(JobContext jc) throws Exception {
+      SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+      if (jobInfo == null) {
+        List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+        if (list != null && list.size() == 1) {
+          JavaFutureAction<?> futureAction = list.get(0);
+          if (futureAction.isDone()) {
+            jobInfo = new SparkJobInfo() {
+              @Override
+              public int jobId() {
+                return sparkJobId;
+              }
+
+              @Override
+              public int[] stageIds() {
+                return new int[0];
+              }
+
+              @Override
+              public JobExecutionStatus status() {
+                return JobExecutionStatus.SUCCEEDED;
+              }
+            };
+          }
+        }
+      }
+      if(jobInfo == null) {
+        jobInfo = new SparkJobInfo() {
+          @Override
+          public int jobId() {
+            return -1;
+          }
+
+          @Override
+          public int[] stageIds() {
+            return new int[0];
+          }
+
+          @Override
+          public JobExecutionStatus status() {
+            return JobExecutionStatus.UNKNOWN;
+          }
+        };
+      }
+      return new HiveSparkJobInfo(jobInfo);
+    }
+  }
+
+  private static class GetStageInfoJob implements Job<HiveSparkStageInfo>{
+    private final int stageId;
+
+    GetStageInfoJob(int stageId){
+      this.stageId=stageId;
+    }
+
+    @Override
+    public HiveSparkStageInfo call(JobContext jc) throws Exception {
+      SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageId);
+      return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo();
+    }
+  }
+}

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Tue Nov 25 04:18:26 2014
@@ -22,6 +22,9 @@ import org.apache.spark.api.java.JavaSpa
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 
+import java.util.List;
+import java.util.Map;
+
 /**
  * Holds runtime information about the job execution context.
  *
@@ -42,4 +45,9 @@ public interface JobContext {
    */
   <T> JavaFutureAction<T> monitor(JavaFutureAction<T> job);
 
+  /**
+   * Return a map from client job Id to corresponding JavaFutureActions
+   */
+  Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
+
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Tue Nov 25 04:18:26 2014
@@ -20,14 +20,20 @@ package org.apache.hive.spark.client;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 class JobContextImpl implements JobContext {
 
   private final JavaSparkContext sc;
   private final ThreadLocal<MonitorCallback> monitorCb;
+  private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
 
   public JobContextImpl(JavaSparkContext sc) {
     this.sc = sc;
     this.monitorCb = new ThreadLocal<MonitorCallback>();
+    monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
   }
 
 
@@ -42,11 +48,17 @@ class JobContextImpl implements JobConte
     return job;
   }
 
+  @Override
+  public Map<String, List<JavaFutureAction<?>>> getMonitoredJobs() {
+    return monitoredJobs;
+  }
+
   void setMonitorCb(MonitorCallback cb) {
     monitorCb.set(cb);
   }
 
   void stop() {
+    monitoredJobs.clear();
     sc.stop();
   }
 

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Tue Nov 25 04:18:26 2014
@@ -18,6 +18,7 @@
 package org.apache.hive.spark.client;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -42,6 +43,11 @@ public interface JobHandle<T extends Ser
    */
   MetricsCollection getMetrics();
 
+  /**
+   * Get corresponding spark job IDs for this job
+   */
+  List<Integer> getSparkJobIds();
+
   // TODO: expose job status?
 
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Tue Nov 25 04:18:26 2014
@@ -18,6 +18,8 @@
 package org.apache.hive.spark.client;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +40,8 @@ class JobHandleImpl<T extends Serializab
   private T result;
   private Throwable error;
 
+  private final List<Integer> sparkJobIds;
+
   JobHandleImpl(SparkClientImpl client, String jobId) {
     this.client = client;
     this.jobId = jobId;
@@ -45,6 +49,7 @@ class JobHandleImpl<T extends Serializab
     this.metrics = new MetricsCollection();
     this.cancelled = new AtomicBoolean();
     this.completed = false;
+    this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
   }
 
   /** Requests a running job to be cancelled. */
@@ -103,6 +108,11 @@ class JobHandleImpl<T extends Serializab
     return metrics;
   }
 
+  @Override
+  public List<Integer> getSparkJobIds() {
+    return sparkJobIds;
+  }
+
   private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
     long deadline = System.currentTimeMillis() + timeout;
     synchronized (monitor) {

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java Tue Nov 25 04:18:26 2014
@@ -125,4 +125,21 @@ final class Protocol {
 
   }
 
+  /**
+   * Inform the client that a new spark job has been submitted for the client job
+   */
+  static class JobSubmitted implements Serializable {
+    final String clientJobId;
+    final int sparkJobId;
+
+    JobSubmitted(String clientJobId, int sparkJobId) {
+      this.clientJobId = clientJobId;
+      this.sparkJobId = sparkJobId;
+    }
+
+    JobSubmitted() {
+      this(null, -1);
+    }
+  }
+
 }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Tue Nov 25 04:18:26 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
@@ -216,7 +217,7 @@ public class RemoteDriver {
 
         T result = req.job.call(jc);
         synchronized (completed) {
-          while (completed.get() != jobs.size()) {
+          while (completed.get() < jobs.size()) {
             LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
                 req.id, completed.get(), jobs.size());
             completed.wait();
@@ -249,6 +250,11 @@ public class RemoteDriver {
 
     private void monitorJob(JavaFutureAction<?> job) {
       jobs.add(job);
+      if (!jc.getMonitoredJobs().containsKey(req.id)) {
+        jc.getMonitoredJobs().put(req.id, new CopyOnWriteArrayList<JavaFutureAction<?>>());
+      }
+      jc.getMonitoredJobs().get(req.id).add(job);
+      client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor);
     }
 
   }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 25 04:18:26 2014
@@ -319,6 +319,17 @@ class SparkClientImpl implements SparkCl
         } else {
           LOG.warn("Received result for unknown job {}", jr.id);
         }
+      } else if (message instanceof Protocol.JobSubmitted) {
+        Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message;
+        JobHandleImpl<?> handle = jobs.get(jobSubmitted.clientJobId);
+        if (handle != null) {
+          LOG.info("Received spark job ID: {} for {}",
+              jobSubmitted.sparkJobId, jobSubmitted.clientJobId);
+          handle.getSparkJobIds().add(jobSubmitted.sparkJobId);
+        } else {
+          LOG.warn("Received spark job ID: {} for unknown job {}",
+              jobSubmitted.sparkJobId, jobSubmitted.clientJobId);
+        }
       }
     }
 

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.status;
+
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper of SparkJobInfo
+ */
+public class HiveSparkJobInfo implements SparkJobInfo, Serializable {
+  private final int jobId;
+  private final int[] stageIds;
+  private final JobExecutionStatus status;
+
+  public HiveSparkJobInfo(SparkJobInfo jobInfo) {
+    this.jobId = jobInfo.jobId();
+    this.stageIds = jobInfo.stageIds();
+    this.status = jobInfo.status();
+  }
+
+  public HiveSparkJobInfo(int jobId, int[] stageIds, JobExecutionStatus status) {
+    this.jobId = jobId;
+    this.stageIds = stageIds;
+    this.status = status;
+  }
+
+  public HiveSparkJobInfo() {
+    this(-1, new int[0], JobExecutionStatus.UNKNOWN);
+  }
+
+  @Override
+  public int jobId() {
+    return jobId;
+  }
+
+  @Override
+  public int[] stageIds() {
+    return stageIds;
+  }
+
+  @Override
+  public JobExecutionStatus status() {
+    return status;
+  }
+
+}

Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.status;
+
+import org.apache.spark.SparkStageInfo;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper of SparkStageInfo
+ */
+public class HiveSparkStageInfo implements SparkStageInfo, Serializable {
+  private final int stageId;
+  private final int currentAttemptId;
+  private final String name;
+  private final int numTasks;
+  private final int numActiveTasks;
+  private final int numCompletedTasks;
+  private final int numFailedTasks;
+
+  public HiveSparkStageInfo(SparkStageInfo stageInfo) {
+    stageId = stageInfo.stageId();
+    currentAttemptId = stageInfo.currentAttemptId();
+    name = stageInfo.name();
+    numTasks = stageInfo.numTasks();
+    numActiveTasks = stageInfo.numActiveTasks();
+    numCompletedTasks = stageInfo.numCompletedTasks();
+    numFailedTasks = stageInfo.numFailedTasks();
+  }
+
+  public HiveSparkStageInfo(int stageId, int currentAttemptId, String name,
+      int numTasks, int numActiveTasks, int numCompletedTasks, int numFailedTasks) {
+    this.stageId = stageId;
+    this.currentAttemptId = currentAttemptId;
+    this.name = name;
+    this.numTasks = numTasks;
+    this.numActiveTasks = numActiveTasks;
+    this.numCompletedTasks = numCompletedTasks;
+    this.numFailedTasks = numFailedTasks;
+  }
+
+  public HiveSparkStageInfo() {
+    this(-1, -1, null, -1, -1, -1, -1);
+  }
+
+  @Override
+  public int stageId() {
+    return stageId;
+  }
+
+  @Override
+  public int currentAttemptId() {
+    return currentAttemptId;
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public int numTasks() {
+    return numTasks;
+  }
+
+  @Override
+  public int numActiveTasks() {
+    return numActiveTasks;
+  }
+
+  @Override
+  public int numCompletedTasks() {
+    return numCompletedTasks;
+  }
+
+  @Override
+  public int numFailedTasks() {
+    return numFailedTasks;
+  }
+
+}