You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/22 19:51:31 UTC
svn commit: r1654003 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: ./
status/ status/impl/
Author: xuefu
Date: Thu Jan 22 18:51:30 2015
New Revision: 1654003
URL: http://svn.apache.org/r1654003
Log:
HIVE-9370: SparkJobMonitor timeout as sortByKey would launch extra Spark job before original job get submitted [Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Thu Jan 22 18:51:30 2015
@@ -137,7 +137,7 @@ public class LocalHiveSparkClient implem
int jobId = future.jobIds().get(0);
LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus(
sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), future);
- return new LocalSparkJobRef(Integer.toString(jobId), sparkJobStatus, sc);
+ return new LocalSparkJobRef(Integer.toString(jobId), hiveConf, sparkJobStatus, sc);
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Thu Jan 22 18:51:30 2015
@@ -77,10 +77,13 @@ public class RemoteHiveSparkClient imple
private transient List<String> localJars = new ArrayList<String>();
private transient List<String> localFiles = new ArrayList<String>();
+ private final transient long sparkClientTimtout;
+
RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
this.hiveConf = hiveConf;
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
remoteClient = SparkClientFactory.createClient(conf, hiveConf);
+ sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
}
@Override
@@ -90,17 +93,14 @@ public class RemoteHiveSparkClient imple
@Override
public int getExecutorCount() throws Exception {
- long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
Future<Integer> handler = remoteClient.getExecutorCount();
- return handler.get(timeout, TimeUnit.SECONDS).intValue();
+ return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue();
}
@Override
public int getDefaultParallelism() throws Exception {
- long timeout = hiveConf.getTimeVar(
- HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
Future<Integer> handler = remoteClient.getDefaultParallelism();
- return handler.get(timeout, TimeUnit.SECONDS);
+ return handler.get(sparkClientTimtout, TimeUnit.SECONDS);
}
@Override
@@ -119,11 +119,10 @@ public class RemoteHiveSparkClient imple
byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir);
byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork);
- long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
-
- JobHandle<Serializable> jobHandle = remoteClient.submit(
- new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes));
- return new RemoteSparkJobRef(jobHandle, new RemoteSparkJobStatus(remoteClient, jobHandle, timeout));
+ JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes);
+ JobHandle<Serializable> jobHandle = remoteClient.submit(job);
+ RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout);
+ return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus);
}
private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Jan 22 18:51:30 2015
@@ -49,7 +49,7 @@ import org.apache.hadoop.hive.ql.exec.sp
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
+import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -65,7 +65,6 @@ import org.apache.hadoop.hive.ql.plan.Op
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.util.StringUtils;
@@ -102,23 +101,20 @@ public class SparkTask extends Task<Spar
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
+ rc = jobRef.monitorJob();
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
- if (sparkJobStatus != null) {
- SparkJobMonitor monitor = new SparkJobMonitor(conf, sparkJobStatus);
- rc = monitor.startMonitor();
+ if (rc == 0) {
sparkCounters = sparkJobStatus.getCounter();
- if (rc == 0 ) {
- // for RSC, we should get the counters after job has finished
- SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
- if (LOG.isInfoEnabled() && sparkStatistics != null) {
- LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
- logSparkStatistic(sparkStatistics);
- }
- } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
- jobRef.cancelJob();
+ // for RSC, we should get the counters after job has finished
+ SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
+ if (LOG.isInfoEnabled() && sparkStatistics != null) {
+ LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
+ logSparkStatistic(sparkStatistics);
}
- sparkJobStatus.cleanup();
+ } else if (rc == 2) { // Cancel job if the monitor found job submission timeout.
+ jobRef.cancelJob();
}
+ sparkJobStatus.cleanup();
} catch (Exception e) {
String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'";
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java?rev=1654003&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.spark.JobExecutionStatus;
+
+/**
+ * LocalSparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class LocalSparkJobMonitor extends SparkJobMonitor {
+
+ private SparkJobStatus sparkJobStatus;
+
+ public LocalSparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) {
+ super(hiveConf);
+ this.sparkJobStatus = sparkJobStatus;
+ }
+
+ public int startMonitor() {
+ boolean running = false;
+ boolean done = false;
+ int rc = 0;
+ JobExecutionStatus lastState = null;
+ Map<String, SparkStageProgress> lastProgressMap = null;
+
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
+ long startTime = System.currentTimeMillis();
+
+ while (true) {
+ try {
+ JobExecutionStatus state = sparkJobStatus.getState();
+ if (LOG.isDebugEnabled()) {
+ console.printInfo("state = " + state);
+ }
+
+ if (state == null) {
+ long timeCount = (System.currentTimeMillis() - startTime)/1000;
+ if (timeCount > monitorTimeoutInteval) {
+ LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
+ console.printError("Status: " + state);
+ running = false;
+ done = true;
+ rc = 2;
+ break;
+ }
+ } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
+ lastState = state;
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+
+ switch (state) {
+ case RUNNING:
+ if (!running) {
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+ // print job stages.
+ console.printInfo("\nQuery Hive on Spark job["
+ + sparkJobStatus.getJobId() + "] stages:");
+ for (int stageId : sparkJobStatus.getStageIds()) {
+ console.printInfo(Integer.toString(stageId));
+ }
+
+ console.printInfo("\nStatus: Running (Hive on Spark job["
+ + sparkJobStatus.getJobId() + "])");
+ running = true;
+
+ console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+ + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+ }
+
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
+ break;
+ case SUCCEEDED:
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
+ double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+ console.printInfo("Status: Finished successfully in "
+ + String.format("%.2f seconds", duration));
+ running = false;
+ done = true;
+ break;
+ case FAILED:
+ console.printError("Status: Failed");
+ running = false;
+ done = true;
+ rc = 3;
+ break;
+ case UNKNOWN:
+ console.printError("Status: Unknown");
+ running = false;
+ done = true;
+ rc = 4;
+ break;
+ }
+ }
+ if (!done) {
+ Thread.sleep(checkInterval);
+ }
+ } catch (Exception e) {
+ String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
+ msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
+
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ LOG.error(msg, e);
+ console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ rc = 1;
+ done = true;
+ } finally {
+ if (done) {
+ break;
+ }
+ }
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+ return rc;
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java?rev=1654003&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hive.spark.client.JobHandle;
+import org.apache.spark.JobExecutionStatus;
+
+/**
+ * RemoteSparkJobMonitor monitor a RSC remote job status in a loop until job finished/failed/killed.
+ * It print current job status to console and sleep current thread between monitor interval.
+ */
+public class RemoteSparkJobMonitor extends SparkJobMonitor {
+
+ private RemoteSparkJobStatus sparkJobStatus;
+
+ public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) {
+ super(hiveConf);
+ this.sparkJobStatus = sparkJobStatus;
+ }
+
+ @Override
+ public int startMonitor() {
+ boolean running = false;
+ boolean done = false;
+ int rc = 0;
+ Map<String, SparkStageProgress> lastProgressMap = null;
+
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+
+ long startTime = System.currentTimeMillis();
+
+ while (true) {
+ try {
+ JobHandle.State state = sparkJobStatus.getRemoteJobState();
+ if (LOG.isDebugEnabled()) {
+ console.printInfo("state = " + state);
+ }
+
+ switch (state) {
+ case SENT:
+ case QUEUED:
+ long timeCount = (System.currentTimeMillis() - startTime) / 1000;
+ if ((timeCount > monitorTimeoutInteval)) {
+ LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
+ console.printError("Status: " + state);
+ running = false;
+ done = true;
+ rc = 2;
+ }
+ break;
+ case STARTED:
+ JobExecutionStatus sparkJobState = sparkJobStatus.getState();
+ if (sparkJobState == JobExecutionStatus.RUNNING) {
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+ if (!running) {
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+ // print job stages.
+ console.printInfo("\nQuery Hive on Spark job["
+ + sparkJobStatus.getJobId() + "] stages:");
+ for (int stageId : sparkJobStatus.getStageIds()) {
+ console.printInfo(Integer.toString(stageId));
+ }
+
+ console.printInfo("\nStatus: Running (Hive on Spark job["
+ + sparkJobStatus.getJobId() + "])");
+ running = true;
+
+ console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+ + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+ }
+
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
+ }
+ break;
+ case SUCCEEDED:
+ Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
+ printStatus(progressMap, lastProgressMap);
+ lastProgressMap = progressMap;
+ double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+ console.printInfo("Status: Finished successfully in "
+ + String.format("%.2f seconds", duration));
+ running = false;
+ done = true;
+ break;
+ case FAILED:
+ console.printError("Status: Failed");
+ running = false;
+ done = true;
+ rc = 3;
+ break;
+ }
+
+ if (!done) {
+ Thread.sleep(checkInterval);
+ }
+ } catch (Exception e) {
+ String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
+ msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
+
+ // Has to use full name to make sure it does not conflict with
+ // org.apache.commons.lang.StringUtils
+ LOG.error(msg, e);
+ console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ rc = 1;
+ done = true;
+ } finally {
+ if (done) {
+ break;
+ }
+ }
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
+ return rc;
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Jan 22 18:51:30 2015
@@ -15,8 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.hive.ql.exec.spark.status;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
@@ -26,147 +34,27 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.spark.JobExecutionStatus;
-
-/**
- * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed.
- * It print current job status to console and sleep current thread between monitor interval.
- */
-public class SparkJobMonitor {
+abstract class SparkJobMonitor {
- private static final String CLASS_NAME = SparkJobMonitor.class.getName();
- private static final Log LOG = LogFactory.getLog(CLASS_NAME);
+ protected static final String CLASS_NAME = SparkJobMonitor.class.getName();
+ protected static final Log LOG = LogFactory.getLog(CLASS_NAME);
+ protected static SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+ protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ protected final int checkInterval = 1000;
+ protected final long monitorTimeoutInteval;
- private transient LogHelper console;
- private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
- private final long monitorTimeoutInteval;
- private final int checkInterval = 1000;
+ private Set<String> completed = new HashSet<String>();
private final int printInterval = 3000;
- private final HiveConf hiveConf;
private long lastPrintTime;
- private Set<String> completed;
-
- private SparkJobStatus sparkJobStatus;
- public SparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) {
- this.sparkJobStatus = sparkJobStatus;
- this.hiveConf = hiveConf;
+ protected SparkJobMonitor(HiveConf hiveConf) {
monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
- console = new LogHelper(LOG);
}
- public int startMonitor() {
- completed = new HashSet<String>();
-
- boolean running = false;
- boolean done = false;
- int rc = 0;
- JobExecutionStatus lastState = null;
- Map<String, SparkStageProgress> lastProgressMap = null;
-
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
-
- long startTime = System.currentTimeMillis();
-
- while (true) {
- try {
- JobExecutionStatus state = sparkJobStatus.getState();
- if (LOG.isDebugEnabled()) {
- console.printInfo("state = " + state);
- }
-
- if (state == null) {
- long timeCount = (System.currentTimeMillis() - startTime)/1000;
- if (timeCount > monitorTimeoutInteval) {
- LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it.");
- console.printError("Status: " + state);
- running = false;
- done = true;
- rc = 2;
- break;
- }
- } else if (state != lastState || state == JobExecutionStatus.RUNNING) {
- lastState = state;
- Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
-
- switch (state) {
- case RUNNING:
- if (!running) {
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
- // print job stages.
- console.printInfo("\nQuery Hive on Spark job["
- + sparkJobStatus.getJobId() + "] stages:");
- for (int stageId : sparkJobStatus.getStageIds()) {
- console.printInfo(Integer.toString(stageId));
- }
-
- console.printInfo("\nStatus: Running (Hive on Spark job["
- + sparkJobStatus.getJobId() + "])");
- running = true;
-
- console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
- + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
- }
-
- printStatus(progressMap, lastProgressMap);
- lastProgressMap = progressMap;
- break;
- case SUCCEEDED:
- printStatus(progressMap, lastProgressMap);
- lastProgressMap = progressMap;
- double duration = (System.currentTimeMillis() - startTime) / 1000.0;
- console.printInfo("Status: Finished successfully in "
- + String.format("%.2f seconds", duration));
- running = false;
- done = true;
- break;
- case FAILED:
- console.printError("Status: Failed");
- running = false;
- done = true;
- rc = 3;
- break;
- case UNKNOWN:
- console.printError("Status: Unknown");
- running = false;
- done = true;
- rc = 4;
- break;
- }
- }
- if (!done) {
- Thread.sleep(checkInterval);
- }
- } catch (Exception e) {
- String msg = " with exception '" + Utilities.getNameMessage(e) + "'";
- msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg;
-
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- LOG.error(msg, e);
- console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
- rc = 1;
- done = true;
- } finally {
- if (done) {
- break;
- }
- }
- }
+ public abstract int startMonitor();
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
- return rc;
- }
-
- private void printStatus(Map<String, SparkStageProgress> progressMap,
- Map<String, SparkStageProgress> lastProgressMap) {
+ protected void printStatus(Map<String, SparkStageProgress> progressMap,
+ Map<String, SparkStageProgress> lastProgressMap) {
// do not print duplicate status while still in middle of print interval.
boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
@@ -217,13 +105,13 @@ public class SparkJobMonitor {
if (failed > 0) {
/* tasks finished but some failed */
reportBuffer.append(
- String.format(
- "%s: %d(-%d)/%d Finished with failed tasks\t",
- stageName, complete, failed, total));
+ String.format(
+ "%s: %d(-%d)/%d Finished with failed tasks\t",
+ stageName, complete, failed, total));
} else {
if (complete == total) {
reportBuffer.append(
- String.format("%s: %d/%d Finished\t", stageName, complete, total));
+ String.format("%s: %d/%d Finished\t", stageName, complete, total));
} else {
reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -25,4 +25,5 @@ public interface SparkJobRef {
public boolean cancelJob();
+ public int monitorJob();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.spark.api.java.JavaSparkContext;
@@ -24,11 +26,18 @@ import org.apache.spark.api.java.JavaSpa
public class LocalSparkJobRef implements SparkJobRef {
private final String jobId;
- private final SparkJobStatus sparkJobStatus;
+ private final HiveConf hiveConf;
+ private final LocalSparkJobStatus sparkJobStatus;
private final JavaSparkContext javaSparkContext;
- public LocalSparkJobRef(String jobId, SparkJobStatus sparkJobStatus, JavaSparkContext javaSparkContext) {
+ public LocalSparkJobRef(
+ String jobId,
+ HiveConf hiveConf,
+ LocalSparkJobStatus sparkJobStatus,
+ JavaSparkContext javaSparkContext) {
+
this.jobId = jobId;
+ this.hiveConf = hiveConf;
this.sparkJobStatus = sparkJobStatus;
this.javaSparkContext = javaSparkContext;
}
@@ -49,4 +58,10 @@ public class LocalSparkJobRef implements
javaSparkContext.sc().cancelJob(id);
return true;
}
+
+ @Override
+ public int monitorJob() {
+ LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus);
+ return localSparkJobMonitor.startMonitor();
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java Thu Jan 22 18:51:30 2015
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.status.impl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hive.spark.client.JobHandle;
@@ -26,12 +28,14 @@ import java.io.Serializable;
public class RemoteSparkJobRef implements SparkJobRef {
private final String jobId;
- private final SparkJobStatus sparkJobStatus;
+ private final HiveConf hiveConf;
+ private final RemoteSparkJobStatus sparkJobStatus;
private final JobHandle<Serializable> jobHandler;
- public RemoteSparkJobRef(JobHandle<Serializable> jobHandler, SparkJobStatus sparkJobStatus) {
+ public RemoteSparkJobRef(HiveConf hiveConf, JobHandle<Serializable> jobHandler, RemoteSparkJobStatus sparkJobStatus) {
this.jobHandler = jobHandler;
this.jobId = jobHandler.getClientJobId();
+ this.hiveConf = hiveConf;
this.sparkJobStatus = sparkJobStatus;
}
@@ -49,4 +53,10 @@ public class RemoteSparkJobRef implement
public boolean cancelJob() {
return jobHandler.cancel(true);
}
+
+ @Override
+ public int monitorJob() {
+ RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus);
+ return remoteSparkJobMonitor.startMonitor();
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1654003&r1=1654002&r2=1654003&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan 22 18:51:30 2015
@@ -153,6 +153,10 @@ public class RemoteSparkJobStatus implem
}
}
+ public JobHandle.State getRemoteJobState() {
+ return jobHandle.getState();
+ }
+
private static class GetJobInfoJob implements Job<SparkJobInfo> {
private final String clientJobId;
private final int sparkJobId;