You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/14 04:53:44 UTC
svn commit: r1631607 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./
status/ status/impl/
Author: xuefu
Date: Tue Oct 14 02:53:43 2014
New Revision: 1631607
URL: http://svn.apache.org/r1631607
Log:
HIVE-7439: Spark job monitoring and error reporting [Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1631607&r1=1631606&r2=1631607&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Tue Oct 14 02:53:43 2014
@@ -29,12 +29,21 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.FutureAction;
+import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.ui.jobs.JobProgressListener;
import java.io.IOException;
import java.io.InputStream;
@@ -69,8 +78,17 @@ public class SparkClient implements Seri
private List<String> localFiles = new ArrayList<String>();
+ private JobStateListener jobStateListener;
+
+ private JobProgressListener jobProgressListener;
+
private SparkClient(Configuration hiveConf) {
- sc = new JavaSparkContext(initiateSparkConf(hiveConf));
+ SparkConf sparkConf = initiateSparkConf(hiveConf);
+ sc = new JavaSparkContext(sparkConf);
+ jobStateListener = new JobStateListener();
+ jobProgressListener = new JobProgressListener(sparkConf);
+ sc.sc().listenerBus().addListener(jobStateListener);
+ sc.sc().listenerBus().addListener(jobProgressListener);
}
private SparkConf initiateSparkConf(Configuration hiveConf) {
@@ -161,7 +179,15 @@ public class SparkClient implements Seri
// Execute generated plan.
try {
- plan.execute();
+ JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
+ // We use Spark RDD async action to submit job as it's the only way to get jobId now.
+ FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ // An action may trigger multi jobs in Spark, we only monitor the latest job here
+ // until we found that Hive does trigger multi jobs.
+ SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus(
+ (Integer) future.jobIds().last(), jobStateListener, jobProgressListener);
+ SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
+ monitor.startMonitor();
} catch (Exception e) {
LOG.error("Error executing Spark Plan", e);
return 1;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1631607&r1=1631606&r2=1631607&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Tue Oct 14 02:53:43 2014
@@ -38,7 +38,7 @@ public class SparkPlan {
private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
- public void execute() throws IllegalStateException {
+ public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
= new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
for (SparkTran tran : getAllTrans()) {
@@ -74,7 +74,7 @@ public class SparkPlan {
}
}
- finalRDD.foreach(HiveVoidFunction.getInstance());
+ return finalRDD;
}
public void addTran(SparkTran tran) {
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class SparkJobMonitor {
+
+ private static final String CLASS_NAME = SparkJobMonitor.class.getName();
+ private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ private transient LogHelper console;
+ private final int checkInterval = 200;
+ private final int maxRetryInterval = 2500;
+ private final int printInterval = 3000;
+ private long lastPrintTime;
+ private Set<String> completed;
+
+ private SparkJobStatus sparkJobStatus;
+
+ public SparkJobMonitor(SparkJobStatus sparkJobStatus) {
+ this.sparkJobStatus = sparkJobStatus;
+ console = new LogHelper(LOG);
+ }
+
+ public int startMonitor() {
+ completed = new HashSet<String>();
+
+ boolean running = false;
+ boolean done = false;
+ int failedCounter = 0;
+ int rc = 0;
+ SparkJobState lastState = null;
+ String lastReport = null;
+ long startTime = 0;
+
+ while(true) {
+
+ try {
+ Map<String, SparkProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+ SparkJobState state = sparkJobStatus.getState();
+
+ if (state != lastState || state == SparkJobState.RUNNING) {
+ lastState = state;
+
+ switch(state) {
+ case SUBMITTED:
+ console.printInfo("Status: Submitted");
+ break;
+ case INITING:
+ console.printInfo("Status: Initializing");
+ break;
+ case RUNNING:
+ if (!running) {
+ // print job stages.
+ console.printInfo("\nQuery Hive on Spark job[" +
+ sparkJobStatus.getJobId() + "] stages:");
+ for (int stageId : sparkJobStatus.getStageIds()) {
+ console.printInfo(Integer.toString(stageId));
+ }
+
+ console.printInfo("\nStatus: Running (Hive on Spark job[" +
+ sparkJobStatus.getJobId() + "])\n");
+ startTime = System.currentTimeMillis();
+ running = true;
+ }
+
+ lastReport = printStatus(progressMap, lastReport, console);
+ break;
+ case SUCCEEDED:
+ lastReport = printStatus(progressMap, lastReport, console);
+ double duration = (System.currentTimeMillis() - startTime)/1000.0;
+ console.printInfo("Status: Finished successfully in " +
+ String.format("%.2f seconds", duration));
+ running = false;
+ done = true;
+ break;
+ case KILLED:
+ console.printInfo("Status: Killed");
+ running = false;
+ done = true;
+ rc = 1;
+ break;
+ case FAILED:
+ case ERROR:
+ console.printError("Status: Failed");
+ running = false;
+ done = true;
+ rc = 2;
+ break;
+ }
+ }
+ if (!done) {
+ Thread.sleep(checkInterval);
+ }
+ } catch (Exception e) {
+ console.printInfo("Exception: "+e.getMessage());
+ if (++failedCounter % maxRetryInterval/checkInterval == 0
+ || e instanceof InterruptedException) {
+ console.printInfo("Killing Job...");
+ console.printError("Execution has failed.");
+ rc = 1;
+ done = true;
+ } else {
+ console.printInfo("Retrying...");
+ }
+ } finally {
+ if (done) {
+ break;
+ }
+ }
+ }
+ return rc;
+ }
+
+ private String printStatus(
+ Map<String, SparkProgress> progressMap,
+ String lastReport,
+ LogHelper console) {
+
+ StringBuffer reportBuffer = new StringBuffer();
+
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ for (String s: keys) {
+ SparkProgress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskCount();
+ if (total <= 0) {
+ reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
+ } else {
+ if (complete == total && !completed.contains(s)) {
+ completed.add(s);
+ }
+ if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
+ /* stage is started, but not complete */
+ if (failed > 0) {
+ reportBuffer.append(
+ String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+ }
+ } else {
+ /* stage is waiting for input/slots or complete */
+ if (failed > 0) {
+ /* tasks finished but some failed */
+ reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+ }
+ }
+ }
+ }
+
+ String report = reportBuffer.toString();
+ if (!report.equals(lastReport)
+ || System.currentTimeMillis() >= lastPrintTime + printInterval) {
+ console.printInfo(report);
+ lastPrintTime = System.currentTimeMillis();
+ }
+
+ return report;
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public enum SparkJobState {
+
+ SUBMITTED,
+ INITING,
+ RUNNING,
+ SUCCEEDED,
+ KILLED,
+ FAILED,
+ ERROR,
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+/**
+ * SparkJobStatus identify what Hive want to know about the status of a Spark job.
+ */
+public interface SparkJobStatus {
+
+ public int getJobId();
+
+ public SparkJobState getState();
+
+ public SparkProgress getSparkJobProgress();
+
+ public int[] getStageIds();
+
+ public Map<String, SparkProgress> getSparkStageProgress();
+
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+public class SparkProgress {
+
+ private int totalTaskCount;
+ private int succeededTaskCount;
+ private int runningTaskCount;
+ private int failedTaskCount;
+ private int killedTaskCount;
+
+ public SparkProgress(
+ int totalTaskCount,
+ int succeededTaskCount,
+ int runningTaskCount,
+ int failedTaskCount,
+ int killedTaskCount) {
+
+ this.totalTaskCount = totalTaskCount;
+ this.succeededTaskCount = succeededTaskCount;
+ this.runningTaskCount = runningTaskCount;
+ this.failedTaskCount = failedTaskCount;
+ this.killedTaskCount = killedTaskCount;
+ }
+
+ public int getTotalTaskCount() {
+ return totalTaskCount;
+ }
+
+ public int getSucceededTaskCount() {
+ return succeededTaskCount;
+ }
+
+ public int getRunningTaskCount() {
+ return runningTaskCount;
+ }
+
+ public int getFailedTaskCount() {
+ return failedTaskCount;
+ }
+
+ public int getKilledTaskCount() {
+ return killedTaskCount;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SparkProgress) {
+ SparkProgress other = (SparkProgress) obj;
+ return getTotalTaskCount() == other.getTotalTaskCount()
+ && getSucceededTaskCount() == other.getSucceededTaskCount()
+ && getRunningTaskCount() == other.getRunningTaskCount()
+ && getFailedTaskCount() == other.getFailedTaskCount()
+ && getKilledTaskCount() == other.getKilledTaskCount();
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("TotalTasks: ");
+ sb.append(getTotalTaskCount());
+ sb.append(" Succeeded: ");
+ sb.append(getSucceededTaskCount());
+ sb.append(" Running: ");
+ sb.append(getRunningTaskCount());
+ sb.append(" Failed: ");
+ sb.append(getFailedTaskCount());
+ sb.append(" Killed: ");
+ sb.append(getKilledTaskCount());
+ return sb.toString();
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.spark.scheduler.JobSucceeded;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+
+import scala.collection.JavaConversions;
+
+public class JobStateListener implements SparkListener {
+
+ private Map<Integer, SparkJobState> jobIdToStates = new HashMap<Integer, SparkJobState>();
+ private Map<Integer, int[]> jobIdToStageId = new HashMap<Integer, int[]>();
+
+ @Override
+ public void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+
+ }
+
+ @Override
+ public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+
+ }
+
+ @Override
+ public void onTaskStart(SparkListenerTaskStart taskStart) {
+
+ }
+
+ @Override
+ public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+
+ }
+
+ @Override
+ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+
+ }
+
+ @Override
+ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
+ jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING);
+ List<Object> ids = JavaConversions.asJavaList(jobStart.stageIds());
+ int[] intStageIds = new int[ids.size()];
+ for(int i=0; i<ids.size(); i++) {
+ intStageIds[i] = (Integer)ids.get(i);
+ }
+ jobIdToStageId.put(jobStart.jobId(), intStageIds);
+ }
+
+ @Override
+ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
+ // JobSucceeded is a scala singleton object, so we need to add a dollar at the second part.
+ if (jobEnd.jobResult().getClass().getName().equals(JobSucceeded.class.getName() + "$")) {
+ jobIdToStates.put(jobEnd.jobId(), SparkJobState.SUCCEEDED);
+ } else {
+ jobIdToStates.put(jobEnd.jobId(), SparkJobState.FAILED);
+ }
+ }
+
+ @Override
+ public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+
+ }
+
+ @Override
+ public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+
+ }
+
+ @Override
+ public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+
+ }
+
+ @Override
+ public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+
+ }
+
+ @Override
+ public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+
+ }
+
+ @Override
+ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+
+ }
+
+ @Override
+ public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+
+ }
+
+ public synchronized SparkJobState getJobState(int jobId) {
+ return jobIdToStates.get(jobId);
+ }
+
+ public synchronized int[] getStageIds(int jobId) {
+ return jobIdToStageId.get(jobId);
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java?rev=1631607&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java Tue Oct 14 02:53:43 2014
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.exec.spark.status.SparkProgress;
+import org.apache.spark.scheduler.StageInfo;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.spark.ui.jobs.UIData;
+
+import scala.Tuple2;
+
+import static scala.collection.JavaConversions.bufferAsJavaList;
+import static scala.collection.JavaConversions.mutableMapAsJavaMap;
+
+public class SimpleSparkJobStatus implements SparkJobStatus {
+
+ private int jobId;
+ private JobStateListener jobStateListener;
+ private JobProgressListener jobProgressListener;
+
+ public SimpleSparkJobStatus(
+ int jobId,
+ JobStateListener stateListener,
+ JobProgressListener progressListener) {
+
+ this.jobId = jobId;
+ this.jobStateListener = stateListener;
+ this.jobProgressListener = progressListener;
+ }
+
+ @Override
+ public int getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public SparkJobState getState() {
+ return jobStateListener.getJobState(jobId);
+ }
+
+ @Override
+ public SparkProgress getSparkJobProgress() {
+ Map<String, SparkProgress> stageProgresses = getSparkStageProgress();
+
+ int totalTaskCount = 0;
+ int runningTaskCount = 0;
+ int completedTaskCount = 0;
+ int failedTaskCount = 0;
+ int killedTaskCount = 0;
+
+ for (SparkProgress sparkProgress : stageProgresses.values()) {
+ totalTaskCount += sparkProgress.getTotalTaskCount();
+ runningTaskCount += sparkProgress.getRunningTaskCount();
+ completedTaskCount += sparkProgress.getSucceededTaskCount();
+ failedTaskCount += sparkProgress.getFailedTaskCount();
+ killedTaskCount += sparkProgress.getKilledTaskCount();
+ }
+
+ return new SparkProgress(
+ totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, killedTaskCount);
+ }
+
+ @Override
+ public int[] getStageIds() {
+ return jobStateListener.getStageIds(jobId);
+ }
+
+ @Override
+ public Map<String, SparkProgress> getSparkStageProgress() {
+ Map<String, SparkProgress> stageProgresses = new HashMap<String, SparkProgress>();
+ int[] stageIds = jobStateListener.getStageIds(jobId);
+ if (stageIds != null) {
+ for (int stageId : stageIds) {
+ List<StageInfo> stageInfos = getStageInfo(stageId);
+ for (StageInfo stageInfo : stageInfos) {
+ Tuple2<Object, Object> tuple2 = new Tuple2<Object, Object>(stageInfo.stageId(),
+ stageInfo.attemptId());
+ UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get();
+ if (uiData != null) {
+ int runningTaskCount = uiData.numActiveTasks();
+ int completedTaskCount = uiData.numCompleteTasks();
+ int failedTaskCount = uiData.numFailedTasks();
+ int totalTaskCount = stageInfo.numTasks();
+ int killedTaskCount = 0;
+ SparkProgress stageProgress = new SparkProgress(
+ totalTaskCount,
+ completedTaskCount,
+ runningTaskCount,
+ failedTaskCount,
+ killedTaskCount);
+ stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress);
+ }
+ }
+ }
+ }
+ return stageProgresses;
+ }
+
+ private List<StageInfo> getStageInfo(int stageId) {
+ List<StageInfo> stageInfos = new LinkedList<StageInfo>();
+
+ Map<Object, StageInfo> activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages());
+ List<StageInfo> completedStages = bufferAsJavaList(jobProgressListener.completedStages());
+ List<StageInfo> failedStages = bufferAsJavaList(jobProgressListener.failedStages());
+
+ if (activeStages.containsKey(stageId)) {
+ stageInfos.add(activeStages.get(stageId));
+ } else {
+ for (StageInfo stageInfo : completedStages) {
+ if (stageInfo.stageId() == stageId) {
+ stageInfos.add(stageInfo);
+ }
+ }
+
+ for (StageInfo stageInfo : failedStages) {
+ if (stageInfo.stageId() == stageId) {
+ stageInfos.add(stageInfo);
+ }
+ }
+ }
+
+ return stageInfos;
+ }
+}