You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/25 05:18:26 UTC
svn commit: r1641525 - in /hive/branches/spark:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/
spark-client/src/main/java/org/apac...
Author: xuefu
Date: Tue Nov 25 04:18:26 2014
New Revision: 1641525
URL: http://svn.apache.org/r1641525
Log:
HIVE-8834: enable job progress monitoring of Remote Spark Context [Spark Branch] (Rui via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Tue Nov 25 04:18:26 2014
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener;
-import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -142,10 +142,10 @@ public class LocalHiveSparkClient implem
JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
// We use Spark RDD async action to submit job as it's the only way to get jobId now.
JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
- // As we always use foreach action to submit RDD graph, it would only trigger on job.
+ // As we always use foreach action to submit RDD graph, it would only trigger one job.
int jobId = future.jobIds().get(0);
- SimpleSparkJobStatus sparkJobStatus =
- new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
+ LocalSparkJobStatus sparkJobStatus =
+ new LocalSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future);
return new SparkJobRef(Integer.toString(jobId), sparkJobStatus);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Tue Nov 25 04:18:26 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -122,8 +123,7 @@ public class RemoteHiveSparkClient imple
return null;
}
});
- jobHandle.get();
- return new SparkJobRef(jobHandle.getClientJobId());
+ return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle));
}
private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Tue Nov 25 04:18:26 2014
@@ -62,12 +62,13 @@ public class SparkJobMonitor {
int rc = 0;
JobExecutionStatus lastState = null;
Map<String, SparkStageProgress> lastProgressMap = null;
- long startTime = 0;
+ long startTime = -1;
while (true) {
try {
JobExecutionStatus state = sparkJobStatus.getState();
- if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) {
+ if (state != null && state != JobExecutionStatus.UNKNOWN &&
+ (state != lastState || state == JobExecutionStatus.RUNNING)) {
lastState = state;
Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
@@ -97,9 +98,13 @@ public class SparkJobMonitor {
case SUCCEEDED:
printStatus(progressMap, lastProgressMap);
lastProgressMap = progressMap;
- double duration = (System.currentTimeMillis() - startTime) / 1000.0;
- console.printInfo("Status: Finished successfully in " +
- String.format("%.2f seconds", duration));
+ if (startTime < 0) {
+ console.printInfo("Status: Finished successfully within a check interval.");
+ } else {
+ double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+ console.printInfo("Status: Finished successfully in " +
+ String.format("%.2f seconds", duration));
+ }
running = false;
done = true;
break;
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.executor.ShuffleReadMetrics;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.executor.TaskMetrics;
+
+import scala.Option;
+
+public class LocalSparkJobStatus implements SparkJobStatus {
+
+ private final JavaSparkContext sparkContext;
+ private int jobId;
+ // After SPARK-2321, we only use JobMetricsListener to get job metrics
+ // TODO: remove it when the new API provides equivalent functionality
+ private JobMetricsListener jobMetricsListener;
+ private SparkCounters sparkCounters;
+ private JavaFutureAction<Void> future;
+
+ public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId,
+ JobMetricsListener jobMetricsListener, SparkCounters sparkCounters,
+ JavaFutureAction<Void> future) {
+ this.sparkContext = sparkContext;
+ this.jobId = jobId;
+ this.jobMetricsListener = jobMetricsListener;
+ this.sparkCounters = sparkCounters;
+ this.future = future;
+ }
+
+ @Override
+ public int getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public JobExecutionStatus getState() {
+ // For spark job with empty source data, it's not submitted actually, so we would never
+ // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current
+ // job state.
+ if (future.isDone()) {
+ return JobExecutionStatus.SUCCEEDED;
+ } else {
+ // SparkJobInfo may not be available yet
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? null : sparkJobInfo.status();
+ }
+ }
+
+ @Override
+ public int[] getStageIds() {
+ SparkJobInfo sparkJobInfo = getJobInfo();
+ return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds();
+ }
+
+ @Override
+ public Map<String, SparkStageProgress> getSparkStageProgress() {
+ Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
+ for (int stageId : getStageIds()) {
+ SparkStageInfo sparkStageInfo = getStageInfo(stageId);
+ if (sparkStageInfo != null) {
+ int runningTaskCount = sparkStageInfo.numActiveTasks();
+ int completedTaskCount = sparkStageInfo.numCompletedTasks();
+ int failedTaskCount = sparkStageInfo.numFailedTasks();
+ int totalTaskCount = sparkStageInfo.numTasks();
+ SparkStageProgress sparkStageProgress = new SparkStageProgress(
+ totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+ sparkStageInfo.currentAttemptId(), sparkStageProgress);
+ }
+ }
+ return stageProgresses;
+ }
+
+ @Override
+ public SparkCounters getCounter() {
+ return sparkCounters;
+ }
+
+ @Override
+ public SparkStatistics getSparkStatistics() {
+ SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder();
+ // add Hive operator level statistics.
+ sparkStatisticsBuilder.add(sparkCounters);
+ // add spark job metrics.
+ String jobIdentifier = "Spark Job[" + jobId + "] Metrics";
+ Map<String, List<TaskMetrics>> jobMetric = jobMetricsListener.getJobMetric(jobId);
+ if (jobMetric == null) {
+ return null;
+ }
+
+ Map<String, Long> flatJobMetric = combineJobLevelMetrics(jobMetric);
+ for (Map.Entry<String, Long> entry : flatJobMetric.entrySet()) {
+ sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue()));
+ }
+
+ return sparkStatisticsBuilder.build();
+ }
+
+ @Override
+ public void cleanup() {
+ jobMetricsListener.cleanup(jobId);
+ }
+
+ private Map<String, Long> combineJobLevelMetrics(Map<String, List<TaskMetrics>> jobMetric) {
+ Map<String, Long> results = Maps.newLinkedHashMap();
+
+ long executorDeserializeTime = 0;
+ long executorRunTime = 0;
+ long resultSize = 0;
+ long jvmGCTime = 0;
+ long resultSerializationTime = 0;
+ long memoryBytesSpilled = 0;
+ long diskBytesSpilled = 0;
+ long bytesRead = 0;
+ long remoteBlocksFetched = 0;
+ long localBlocksFetched = 0;
+ long fetchWaitTime = 0;
+ long remoteBytesRead = 0;
+ long shuffleBytesWritten = 0;
+ long shuffleWriteTime = 0;
+ boolean inputMetricExist = false;
+ boolean shuffleReadMetricExist = false;
+ boolean shuffleWriteMetricExist = false;
+
+ for (List<TaskMetrics> stageMetric : jobMetric.values()) {
+ if (stageMetric != null) {
+ for (TaskMetrics taskMetrics : stageMetric) {
+ if (taskMetrics != null) {
+ executorDeserializeTime += taskMetrics.executorDeserializeTime();
+ executorRunTime += taskMetrics.executorRunTime();
+ resultSize += taskMetrics.resultSize();
+ jvmGCTime += taskMetrics.jvmGCTime();
+ resultSerializationTime += taskMetrics.resultSerializationTime();
+ memoryBytesSpilled += taskMetrics.memoryBytesSpilled();
+ diskBytesSpilled += taskMetrics.diskBytesSpilled();
+ if (!taskMetrics.inputMetrics().isEmpty()) {
+ inputMetricExist = true;
+ bytesRead += taskMetrics.inputMetrics().get().bytesRead();
+ }
+ Option<ShuffleReadMetrics> shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics();
+ if (!shuffleReadMetricsOption.isEmpty()) {
+ shuffleReadMetricExist = true;
+ remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched();
+ localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched();
+ fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime();
+ remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead();
+ }
+ Option<ShuffleWriteMetrics> shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics();
+ if (!shuffleWriteMetricsOption.isEmpty()) {
+ shuffleWriteMetricExist = true;
+ shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten();
+ shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime();
+ }
+ }
+ }
+ }
+ }
+
+ results.put("EexcutorDeserializeTime", executorDeserializeTime);
+ results.put("ExecutorRunTime", executorRunTime);
+ results.put("ResultSize", resultSize);
+ results.put("JvmGCTime", jvmGCTime);
+ results.put("ResultSerializationTime", resultSerializationTime);
+ results.put("MemoryBytesSpilled", memoryBytesSpilled);
+ results.put("DiskBytesSpilled", diskBytesSpilled);
+ if (inputMetricExist) {
+ results.put("BytesRead", bytesRead);
+ }
+ if (shuffleReadMetricExist) {
+ results.put("RemoteBlocksFetched", remoteBlocksFetched);
+ results.put("LocalBlocksFetched", localBlocksFetched);
+ results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched);
+ results.put("FetchWaitTime", fetchWaitTime);
+ results.put("RemoteBytesRead", remoteBytesRead);
+ }
+ if (shuffleWriteMetricExist) {
+ results.put("ShuffleBytesWritten", shuffleBytesWritten);
+ results.put("ShuffleWriteTime", shuffleWriteTime);
+ }
+ return results;
+ }
+
+ private SparkJobInfo getJobInfo() {
+ return sparkContext.statusTracker().getJobInfo(jobId);
+ }
+
+ private SparkStageInfo getStageInfo(int stageId) {
+ return sparkContext.statusTracker().getStageInfo(stageId);
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
+import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress;
+import org.apache.hive.spark.client.Job;
+import org.apache.hive.spark.client.JobContext;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.hive.spark.client.SparkClient;
+import org.apache.hive.spark.client.status.HiveSparkJobInfo;
+import org.apache.hive.spark.client.status.HiveSparkStageInfo;
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used with remove spark client.
+ */
+public class RemoteSparkJobStatus implements SparkJobStatus {
+ private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName());
+ private final SparkClient sparkClient;
+ private final JobHandle<Serializable> jobHandle;
+
+ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle<Serializable> jobHandle) {
+ this.sparkClient = sparkClient;
+ this.jobHandle = jobHandle;
+ }
+
+ @Override
+ public int getJobId() {
+ return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1;
+ }
+
+ @Override
+ public JobExecutionStatus getState() {
+ SparkJobInfo sparkJobInfo = getSparkJobInfo();
+ return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN;
+ }
+
+ @Override
+ public int[] getStageIds() {
+ SparkJobInfo sparkJobInfo = getSparkJobInfo();
+ return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[0];
+ }
+
+ @Override
+ public Map<String, SparkStageProgress> getSparkStageProgress() {
+ Map<String, SparkStageProgress> stageProgresses = new HashMap<String, SparkStageProgress>();
+ for (int stageId : getStageIds()) {
+ SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId);
+ if (sparkStageInfo != null && sparkStageInfo.name() != null) {
+ int runningTaskCount = sparkStageInfo.numActiveTasks();
+ int completedTaskCount = sparkStageInfo.numCompletedTasks();
+ int failedTaskCount = sparkStageInfo.numFailedTasks();
+ int totalTaskCount = sparkStageInfo.numTasks();
+ SparkStageProgress sparkStageProgress = new SparkStageProgress(
+ totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
+ sparkStageInfo.currentAttemptId(), sparkStageProgress);
+ }
+ }
+ return stageProgresses;
+ }
+
+ @Override
+ public SparkCounters getCounter() {
+ return null;
+ }
+
+ @Override
+ public SparkStatistics getSparkStatistics() {
+ return null;
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+ private SparkJobInfo getSparkJobInfo() {
+ Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
+ jobHandle.getSparkJobIds().get(0) : null;
+ if (sparkJobId == null) {
+ return null;
+ }
+ JobHandle<HiveSparkJobInfo> getJobInfo = sparkClient.submit(
+ new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));
+ try {
+ return getJobInfo.get();
+ } catch (Throwable t) {
+ LOG.warn("Error getting job info", t);
+ return null;
+ }
+ }
+
+ private SparkStageInfo getSparkStageInfo(int stageId) {
+ JobHandle<HiveSparkStageInfo> getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId));
+ try {
+ return getStageInfo.get();
+ } catch (Throwable t) {
+ LOG.warn("Error getting stage info", t);
+ return null;
+ }
+ }
+
+ private static class GetJobInfoJob implements Job<HiveSparkJobInfo> {
+ private final String clientJobId;
+ private final int sparkJobId;
+
+ GetJobInfoJob(String clientJobId, int sparkJobId) {
+ this.clientJobId = clientJobId;
+ this.sparkJobId = sparkJobId;
+ }
+
+ @Override
+ public HiveSparkJobInfo call(JobContext jc) throws Exception {
+ SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId);
+ if (jobInfo == null) {
+ List<JavaFutureAction<?>> list = jc.getMonitoredJobs().get(clientJobId);
+ if (list != null && list.size() == 1) {
+ JavaFutureAction<?> futureAction = list.get(0);
+ if (futureAction.isDone()) {
+ jobInfo = new SparkJobInfo() {
+ @Override
+ public int jobId() {
+ return sparkJobId;
+ }
+
+ @Override
+ public int[] stageIds() {
+ return new int[0];
+ }
+
+ @Override
+ public JobExecutionStatus status() {
+ return JobExecutionStatus.SUCCEEDED;
+ }
+ };
+ }
+ }
+ }
+ if(jobInfo == null) {
+ jobInfo = new SparkJobInfo() {
+ @Override
+ public int jobId() {
+ return -1;
+ }
+
+ @Override
+ public int[] stageIds() {
+ return new int[0];
+ }
+
+ @Override
+ public JobExecutionStatus status() {
+ return JobExecutionStatus.UNKNOWN;
+ }
+ };
+ }
+ return new HiveSparkJobInfo(jobInfo);
+ }
+ }
+
+ private static class GetStageInfoJob implements Job<HiveSparkStageInfo>{
+ private final int stageId;
+
+ GetStageInfoJob(int stageId){
+ this.stageId=stageId;
+ }
+
+ @Override
+ public HiveSparkStageInfo call(JobContext jc) throws Exception {
+ SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageId);
+ return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo();
+ }
+ }
+}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Tue Nov 25 04:18:26 2014
@@ -22,6 +22,9 @@ import org.apache.spark.api.java.JavaSpa
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import java.util.List;
+import java.util.Map;
+
/**
* Holds runtime information about the job execution context.
*
@@ -42,4 +45,9 @@ public interface JobContext {
*/
<T> JavaFutureAction<T> monitor(JavaFutureAction<T> job);
+ /**
+ * Return a map from client job Id to corresponding JavaFutureActions
+ */
+ Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
+
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Tue Nov 25 04:18:26 2014
@@ -20,14 +20,20 @@ package org.apache.hive.spark.client;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaSparkContext;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
class JobContextImpl implements JobContext {
private final JavaSparkContext sc;
private final ThreadLocal<MonitorCallback> monitorCb;
+ private final Map<String, List<JavaFutureAction<?>>> monitoredJobs;
public JobContextImpl(JavaSparkContext sc) {
this.sc = sc;
this.monitorCb = new ThreadLocal<MonitorCallback>();
+ monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>();
}
@@ -42,11 +48,17 @@ class JobContextImpl implements JobConte
return job;
}
+ @Override
+ public Map<String, List<JavaFutureAction<?>>> getMonitoredJobs() {
+ return monitoredJobs;
+ }
+
void setMonitorCb(MonitorCallback cb) {
monitorCb.set(cb);
}
void stop() {
+ monitoredJobs.clear();
sc.stop();
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Tue Nov 25 04:18:26 2014
@@ -18,6 +18,7 @@
package org.apache.hive.spark.client;
import java.io.Serializable;
+import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
@@ -42,6 +43,11 @@ public interface JobHandle<T extends Ser
*/
MetricsCollection getMetrics();
+ /**
+ * Get corresponding spark job IDs for this job
+ */
+ List<Integer> getSparkJobIds();
+
// TODO: expose job status?
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Tue Nov 25 04:18:26 2014
@@ -18,6 +18,8 @@
package org.apache.hive.spark.client;
import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
@@ -38,6 +40,8 @@ class JobHandleImpl<T extends Serializab
private T result;
private Throwable error;
+ private final List<Integer> sparkJobIds;
+
JobHandleImpl(SparkClientImpl client, String jobId) {
this.client = client;
this.jobId = jobId;
@@ -45,6 +49,7 @@ class JobHandleImpl<T extends Serializab
this.metrics = new MetricsCollection();
this.cancelled = new AtomicBoolean();
this.completed = false;
+ this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
}
/** Requests a running job to be cancelled. */
@@ -103,6 +108,11 @@ class JobHandleImpl<T extends Serializab
return metrics;
}
+ @Override
+ public List<Integer> getSparkJobIds() {
+ return sparkJobIds;
+ }
+
private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException {
long deadline = System.currentTimeMillis() + timeout;
synchronized (monitor) {
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java Tue Nov 25 04:18:26 2014
@@ -125,4 +125,21 @@ final class Protocol {
}
+ /**
+ * Inform the client that a new spark job has been submitted for the client job
+ */
+ static class JobSubmitted implements Serializable {
+ final String clientJobId;
+ final int sparkJobId;
+
+ JobSubmitted(String clientJobId, int sparkJobId) {
+ this.clientJobId = clientJobId;
+ this.sparkJobId = sparkJobId;
+ }
+
+ JobSubmitted() {
+ this(null, -1);
+ }
+ }
+
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Tue Nov 25 04:18:26 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
@@ -216,7 +217,7 @@ public class RemoteDriver {
T result = req.job.call(jc);
synchronized (completed) {
- while (completed.get() != jobs.size()) {
+ while (completed.get() < jobs.size()) {
LOG.debug("Client job {} finished, {} of {} Spark jobs finished.",
req.id, completed.get(), jobs.size());
completed.wait();
@@ -249,6 +250,11 @@ public class RemoteDriver {
private void monitorJob(JavaFutureAction<?> job) {
jobs.add(job);
+ if (!jc.getMonitoredJobs().containsKey(req.id)) {
+ jc.getMonitoredJobs().put(req.id, new CopyOnWriteArrayList<JavaFutureAction<?>>());
+ }
+ jc.getMonitoredJobs().get(req.id).add(job);
+ client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor);
}
}
Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1641525&r1=1641524&r2=1641525&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Tue Nov 25 04:18:26 2014
@@ -319,6 +319,17 @@ class SparkClientImpl implements SparkCl
} else {
LOG.warn("Received result for unknown job {}", jr.id);
}
+ } else if (message instanceof Protocol.JobSubmitted) {
+ Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message;
+ JobHandleImpl<?> handle = jobs.get(jobSubmitted.clientJobId);
+ if (handle != null) {
+ LOG.info("Received spark job ID: {} for {}",
+ jobSubmitted.sparkJobId, jobSubmitted.clientJobId);
+ handle.getSparkJobIds().add(jobSubmitted.sparkJobId);
+ } else {
+ LOG.warn("Received spark job ID: {} for unknown job {}",
+ jobSubmitted.sparkJobId, jobSubmitted.clientJobId);
+ }
}
}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.status;
+
+import org.apache.spark.JobExecutionStatus;
+import org.apache.spark.SparkJobInfo;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper of SparkJobInfo
+ */
+public class HiveSparkJobInfo implements SparkJobInfo, Serializable {
+ private final int jobId;
+ private final int[] stageIds;
+ private final JobExecutionStatus status;
+
+ public HiveSparkJobInfo(SparkJobInfo jobInfo) {
+ this.jobId = jobInfo.jobId();
+ this.stageIds = jobInfo.stageIds();
+ this.status = jobInfo.status();
+ }
+
+ public HiveSparkJobInfo(int jobId, int[] stageIds, JobExecutionStatus status) {
+ this.jobId = jobId;
+ this.stageIds = stageIds;
+ this.status = status;
+ }
+
+ public HiveSparkJobInfo() {
+ this(-1, new int[0], JobExecutionStatus.UNKNOWN);
+ }
+
+ @Override
+ public int jobId() {
+ return jobId;
+ }
+
+ @Override
+ public int[] stageIds() {
+ return stageIds;
+ }
+
+ @Override
+ public JobExecutionStatus status() {
+ return status;
+ }
+
+}
Added: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java?rev=1641525&view=auto
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java (added)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java Tue Nov 25 04:18:26 2014
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.spark.client.status;
+
+import org.apache.spark.SparkStageInfo;
+
+import java.io.Serializable;
+
+/**
+ * Wrapper of SparkStageInfo
+ */
+public class HiveSparkStageInfo implements SparkStageInfo, Serializable {
+ private final int stageId;
+ private final int currentAttemptId;
+ private final String name;
+ private final int numTasks;
+ private final int numActiveTasks;
+ private final int numCompletedTasks;
+ private final int numFailedTasks;
+
+ public HiveSparkStageInfo(SparkStageInfo stageInfo) {
+ stageId = stageInfo.stageId();
+ currentAttemptId = stageInfo.currentAttemptId();
+ name = stageInfo.name();
+ numTasks = stageInfo.numTasks();
+ numActiveTasks = stageInfo.numActiveTasks();
+ numCompletedTasks = stageInfo.numCompletedTasks();
+ numFailedTasks = stageInfo.numFailedTasks();
+ }
+
+ public HiveSparkStageInfo(int stageId, int currentAttemptId, String name,
+ int numTasks, int numActiveTasks, int numCompletedTasks, int numFailedTasks) {
+ this.stageId = stageId;
+ this.currentAttemptId = currentAttemptId;
+ this.name = name;
+ this.numTasks = numTasks;
+ this.numActiveTasks = numActiveTasks;
+ this.numCompletedTasks = numCompletedTasks;
+ this.numFailedTasks = numFailedTasks;
+ }
+
+ public HiveSparkStageInfo() {
+ this(-1, -1, null, -1, -1, -1, -1);
+ }
+
+ @Override
+ public int stageId() {
+ return stageId;
+ }
+
+ @Override
+ public int currentAttemptId() {
+ return currentAttemptId;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public int numTasks() {
+ return numTasks;
+ }
+
+ @Override
+ public int numActiveTasks() {
+ return numActiveTasks;
+ }
+
+ @Override
+ public int numCompletedTasks() {
+ return numCompletedTasks;
+ }
+
+ @Override
+ public int numFailedTasks() {
+ return numFailedTasks;
+ }
+
+}