You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/07/02 18:47:53 UTC
hive git commit: HIVE-19176: Add HoS support to progress bar on
Beeline client (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)
Repository: hive
Updated Branches:
refs/heads/master e19b861cf -> e7d1781ec
HIVE-19176: Add HoS support to progress bar on Beeline client (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e7d1781e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e7d1781e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e7d1781e
Branch: refs/heads/master
Commit: e7d1781ec4662e088dcd6ffbe3f866738792ad9b
Parents: e19b861
Author: Bharathkrishna Guruvayoor Murali <bh...@cloudera.com>
Authored: Mon Jul 2 11:42:59 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 2 11:42:59 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../org/apache/hive/jdbc/HiveStatement.java | 4 +-
.../exec/spark/status/LocalSparkJobMonitor.java | 4 +-
.../spark/status/RemoteSparkJobMonitor.java | 4 +-
.../ql/exec/spark/status/RenderStrategy.java | 246 +++++++++++++++++++
.../ql/exec/spark/status/SparkJobMonitor.java | 157 +-----------
.../hive/ql/exec/spark/TestSparkTask.java | 1 +
.../exec/spark/status/TestSparkJobMonitor.java | 29 ++-
.../org/apache/hive/service/ServiceUtils.java | 5 +-
.../cli/SparkProgressMonitorStatusMapper.java | 52 ++++
.../service/cli/thrift/ThriftCLIService.java | 5 +-
11 files changed, 349 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index a3dd53e..7ef22d6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3780,7 +3780,7 @@ public class HiveConf extends Configuration {
"hive.server2.in.place.progress",
true,
"Allows hive server 2 to send progress bar update information. This is currently available"
- + " only if the execution engine is tez."),
+ + " only if the execution engine is tez or Spark."),
TEZ_DAG_STATUS_CHECK_INTERVAL("hive.tez.dag.status.check.interval", "500ms",
new TimeValidator(TimeUnit.MILLISECONDS), "Interval between subsequent DAG status invocation."),
SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index ad8d1a7..0b38f9c 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -376,7 +376,9 @@ public class HiveStatement implements java.sql.Statement {
* essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
statusResp = client.GetOperationStatus(statusReq);
- inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
+ if(!isOperationComplete) {
+ inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
+ }
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
index 2a6c33b..aeef3c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java
@@ -89,11 +89,11 @@ public class LocalSparkJobMonitor extends SparkJobMonitor {
+ "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
}
- printStatus(progressMap, lastProgressMap);
+ updateFunction.printStatus(progressMap, lastProgressMap);
lastProgressMap = progressMap;
break;
case SUCCEEDED:
- printStatus(progressMap, lastProgressMap);
+ updateFunction.printStatus(progressMap, lastProgressMap);
lastProgressMap = progressMap;
double duration = (System.currentTimeMillis() - startTime) / 1000.0;
console.printInfo("Status: Finished successfully in "
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 560fb58..87b69cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -131,13 +131,13 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
}
}
- printStatus(progressMap, lastProgressMap);
+ updateFunction.printStatus(progressMap, lastProgressMap);
lastProgressMap = progressMap;
}
break;
case SUCCEEDED:
Map<SparkStage, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
- printStatus(progressMap, lastProgressMap);
+ updateFunction.printStatus(progressMap, lastProgressMap);
lastProgressMap = progressMap;
double duration = (System.currentTimeMillis() - startTime) / 1000.0;
console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] finished successfully in "
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
new file mode 100644
index 0000000..67a3a9c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RenderStrategy.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to render progress bar for Hive on Spark job status.
+ * Based on the configuration, appropriate render strategy is selected
+ * to show the progress bar on beeline or Hive CLI, as well as for logging
+ * the report String.
+ */
+class RenderStrategy {
+
+ interface UpdateFunction {
+ void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
+ Map<SparkStage, SparkStageProgress> lastProgressMap);
+ }
+
+ private abstract static class BaseUpdateFunction implements UpdateFunction {
+ protected final SparkJobMonitor monitor;
+ private final PerfLogger perfLogger;
+ private long lastPrintTime;
+ private static final int PRINT_INTERVAL = 3000;
+ private final Set<String> completed = new HashSet<String>();
+ private String lastReport = null;
+
+ BaseUpdateFunction(SparkJobMonitor monitor) {
+ this.monitor = monitor;
+ this.perfLogger = SessionState.getPerfLogger();
+ }
+
+ private String getReport(Map<SparkStage, SparkStageProgress> progressMap) {
+ StringBuilder reportBuffer = new StringBuilder();
+ SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ String currentDate = dt.format(new Date());
+ reportBuffer.append(currentDate + "\t");
+
+ // Num of total and completed tasks
+ int sumTotal = 0;
+ int sumComplete = 0;
+
+ SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+ for (SparkStage stage : keys) {
+ SparkStageProgress progress = progressMap.get(stage);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskCount();
+ sumTotal += total;
+ sumComplete += complete;
+ String s = stage.toString();
+ String stageName = "Stage-" + s;
+ if (total <= 0) {
+ reportBuffer.append(String.format("%s: -/-\t", stageName));
+ } else {
+ if (complete == total && !completed.contains(s)) {
+ completed.add(s);
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+ perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+ }
+ perfLogger.PerfLogEnd(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+ }
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+ /* stage is started, but not complete */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
+ perfLogger.PerfLogBegin(SparkJobMonitor.CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
+ }
+ if (failed > 0) {
+ reportBuffer.append(
+ String.format(
+ "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
+ } else {
+ reportBuffer.append(
+ String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
+ }
+ } else {
+ /* stage is waiting for input/slots or complete */
+ if (failed > 0) {
+ /* tasks finished but some failed */
+ reportBuffer.append(
+ String.format(
+ "%s: %d(-%d)/%d Finished with failed tasks\t",
+ stageName, complete, failed, total));
+ } else {
+ if (complete == total) {
+ reportBuffer.append(
+ String.format("%s: %d/%d Finished\t", stageName, complete, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
+ }
+ }
+ }
+ }
+ }
+
+ if (SessionState.get() != null) {
+ final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+ SessionState.get().updateProgressedPercentage(progress);
+ }
+ return reportBuffer.toString();
+ }
+
+ private boolean isSameAsPreviousProgress(
+ Map<SparkStage, SparkStageProgress> progressMap,
+ Map<SparkStage, SparkStageProgress> lastProgressMap) {
+
+ if (lastProgressMap == null) {
+ return false;
+ }
+
+ if (progressMap.isEmpty()) {
+ return lastProgressMap.isEmpty();
+ } else {
+ if (lastProgressMap.isEmpty()) {
+ return false;
+ } else {
+ if (progressMap.size() != lastProgressMap.size()) {
+ return false;
+ }
+ for (Map.Entry<SparkStage, SparkStageProgress> entry : progressMap.entrySet()) {
+ if (!lastProgressMap.containsKey(entry.getKey())
+ || !progressMap.get(entry.getKey()).equals(lastProgressMap.get(entry.getKey()))) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+
+ private boolean showReport(String report) {
+ return !report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL;
+ }
+
+ @Override
+ public void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
+ Map<SparkStage, SparkStageProgress> lastProgressMap) {
+ // do not print duplicate status while still in middle of print interval.
+ boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
+ boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + PRINT_INTERVAL;
+ if (isDuplicateState && withinInterval) {
+ return;
+ }
+
+ String report = getReport(progressMap);
+ renderProgress(monitor.getProgressMonitor(progressMap));
+ if (showReport(report)) {
+ renderReport(report);
+ lastReport = report;
+ lastPrintTime = System.currentTimeMillis();
+ }
+ }
+
+ abstract void renderProgress(ProgressMonitor monitor);
+
+ abstract void renderReport(String report);
+ }
+
+ /**
+ * This is used to show progress bar on Beeline while using HiveServer2.
+ */
+ static class LogToFileFunction extends BaseUpdateFunction {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LogToFileFunction.class);
+ private boolean hiveServer2InPlaceProgressEnabled =
+ SessionState.get().getConf().getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
+
+ LogToFileFunction(SparkJobMonitor monitor) {
+ super(monitor);
+ }
+
+ @Override
+ void renderProgress(ProgressMonitor monitor) {
+ SessionState.get().updateProgressMonitor(monitor);
+ }
+
+ @Override
+ void renderReport(String report) {
+ if (hiveServer2InPlaceProgressEnabled) {
+ LOGGER.info(report);
+ } else {
+ monitor.console.printInfo(report);
+ }
+ }
+ }
+
+ /**
+ * This is used to show progress bar on Hive CLI.
+ */
+ static class InPlaceUpdateFunction extends BaseUpdateFunction {
+ /**
+ * Have to use the same instance to render else the number lines printed earlier is lost and the
+ * screen will print the table again and again.
+ */
+ private final InPlaceUpdate inPlaceUpdate;
+
+ InPlaceUpdateFunction(SparkJobMonitor monitor) {
+ super(monitor);
+ inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+ }
+
+ @Override
+ void renderProgress(ProgressMonitor monitor) {
+ inPlaceUpdate.render(monitor);
+ }
+
+ @Override
+ void renderReport(String report) {
+ monitor.console.logInfo(report);
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 3531ac2..5fd0c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark.status;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -25,13 +26,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
abstract class SparkJobMonitor {
@@ -42,60 +37,27 @@ abstract class SparkJobMonitor {
protected final PerfLogger perfLogger = SessionState.getPerfLogger();
protected final int checkInterval = 1000;
protected final long monitorTimeoutInterval;
- private final InPlaceUpdate inPlaceUpdateFn;
-
- private final Set<String> completed = new HashSet<String>();
- private final int printInterval = 3000;
- private long lastPrintTime;
-
+ final RenderStrategy.UpdateFunction updateFunction;
protected long startTime;
protected enum StageState {
- PENDING,
- RUNNING,
- FINISHED
+ PENDING, RUNNING, FINISHED
}
protected final boolean inPlaceUpdate;
protected SparkJobMonitor(HiveConf hiveConf) {
- monitorTimeoutInterval = hiveConf.getTimeVar(
- HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
+ monitorTimeoutInterval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
console = new SessionState.LogHelper(LOG);
- inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+ updateFunction = updateFunction();
}
public abstract int startMonitor();
- private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) {
- inPlaceUpdateFn.render(getProgressMonitor(progressMap));
- }
-
- protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
- Map<SparkStage, SparkStageProgress> lastProgressMap) {
-
- // do not print duplicate status while still in middle of print interval.
- boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap);
- boolean withinInterval = System.currentTimeMillis() <= lastPrintTime + printInterval;
- if (isDuplicateState && withinInterval) {
- return;
- }
-
- String report = getReport(progressMap);
- if (inPlaceUpdate) {
- printStatusInPlace(progressMap);
- console.logInfo(report);
- } else {
- console.printInfo(report);
- }
-
- lastPrintTime = System.currentTimeMillis();
- }
-
protected int getTotalTaskCount(Map<SparkStage, SparkStageProgress> progressMap) {
int totalTasks = 0;
- for (SparkStageProgress progress: progressMap.values() ) {
+ for (SparkStageProgress progress : progressMap.values()) {
totalTasks += progress.getTotalTaskCount();
}
@@ -104,7 +66,7 @@ abstract class SparkJobMonitor {
protected int getStageMaxTaskCount(Map<SparkStage, SparkStageProgress> progressMap) {
int stageMaxTasks = 0;
- for (SparkStageProgress progress: progressMap.values() ) {
+ for (SparkStageProgress progress : progressMap.values()) {
int tasks = progress.getTotalTaskCount();
if (tasks > stageMaxTasks) {
stageMaxTasks = tasks;
@@ -114,107 +76,12 @@ abstract class SparkJobMonitor {
return stageMaxTasks;
}
- private String getReport(Map<SparkStage, SparkStageProgress> progressMap) {
- StringBuilder reportBuffer = new StringBuilder();
- SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- String currentDate = dt.format(new Date());
- reportBuffer.append(currentDate + "\t");
-
- // Num of total and completed tasks
- int sumTotal = 0;
- int sumComplete = 0;
-
- SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
- for (SparkStage stage : keys) {
- SparkStageProgress progress = progressMap.get(stage);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskCount();
- sumTotal += total;
- sumComplete += complete;
- String s = stage.toString();
- String stageName = "Stage-" + s;
- if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", stageName));
- } else {
- if (complete == total && !completed.contains(s)) {
- completed.add(s);
-
- if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
- }
- if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
- /* stage is started, but not complete */
- if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
- }
- if (failed > 0) {
- reportBuffer.append(
- String.format(
- "%s: %d(+%d,-%d)/%d\t", stageName, complete, running, failed, total));
- } else {
- reportBuffer.append(
- String.format("%s: %d(+%d)/%d\t", stageName, complete, running, total));
- }
- } else {
- /* stage is waiting for input/slots or complete */
- if (failed > 0) {
- /* tasks finished but some failed */
- reportBuffer.append(
- String.format(
- "%s: %d(-%d)/%d Finished with failed tasks\t",
- stageName, complete, failed, total));
- } else {
- if (complete == total) {
- reportBuffer.append(
- String.format("%s: %d/%d Finished\t", stageName, complete, total));
- } else {
- reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total));
- }
- }
- }
- }
- }
-
- if (SessionState.get() != null) {
- final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
- SessionState.get().updateProgressedPercentage(progress);
- }
- return reportBuffer.toString();
- }
-
- private boolean isSameAsPreviousProgress(
- Map<SparkStage, SparkStageProgress> progressMap,
- Map<SparkStage, SparkStageProgress> lastProgressMap) {
-
- if (lastProgressMap == null) {
- return false;
- }
-
- if (progressMap.isEmpty()) {
- return lastProgressMap.isEmpty();
- } else {
- if (lastProgressMap.isEmpty()) {
- return false;
- } else {
- if (progressMap.size() != lastProgressMap.size()) {
- return false;
- }
- for (SparkStage key : progressMap.keySet()) {
- if (!lastProgressMap.containsKey(key)
- || !progressMap.get(key).equals(lastProgressMap.get(key))) {
- return false;
- }
- }
- }
- }
- return true;
+ ProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) {
+ return new SparkProgressMonitor(progressMap, startTime);
}
- private SparkProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) {
- return new SparkProgressMonitor(progressMap, startTime);
+ private RenderStrategy.UpdateFunction updateFunction() {
+ return inPlaceUpdate && !SessionState.get().isHiveServerQuery() ? new RenderStrategy.InPlaceUpdateFunction(
+ this) : new RenderStrategy.LogToFileFunction(this);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
index 368fa9f..2017fc1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkTask.java
@@ -105,6 +105,7 @@ public class TestSparkTask {
when(jobSts.getRemoteJobState()).thenReturn(State.CANCELLED);
when(jobSts.isRemoteActive()).thenReturn(true);
HiveConf hiveConf = new HiveConf();
+ SessionState.start(hiveConf);
RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, jobSts);
Assert.assertEquals(remoteSparkJobMonitor.startMonitor(), 3);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
index e66354f..7257b32 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestSparkJobMonitor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark.status;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -40,22 +41,23 @@ public class TestSparkJobMonitor {
private SparkJobMonitor monitor;
private PrintStream curOut;
private PrintStream curErr;
+ private RenderStrategy.InPlaceUpdateFunction updateFunction;
@Before
public void setUp() {
- testConf = new HiveConf();
curOut = System.out;
curErr = System.err;
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
-
+ testConf = new HiveConf();
+ SessionState.start(testConf);
monitor = new SparkJobMonitor(testConf) {
@Override
public int startMonitor() {
return 0;
}
};
-
+ updateFunction = new RenderStrategy.InPlaceUpdateFunction(monitor);
}
private Map<SparkStage, SparkStageProgress> progressMap() {
@@ -72,12 +74,27 @@ public class TestSparkJobMonitor {
}
@Test
- public void testGetReport() {
+ public void testProgress() {
Map<SparkStage, SparkStageProgress> progressMap = progressMap();
- monitor.printStatus(progressMap, null);
- assertTrue(errContent.toString().contains(
+ updateFunction.printStatus(progressMap, null);
+ String testOutput = errContent.toString();
+ assertTrue(testOutput.contains(
"Stage-1_0: 3(+1)/4\tStage-3_1: 4(+1,-1)/6\tStage-9_0: 5/5 Finished\tStage-10_2: 3(+2)/5\t"
+ "Stage-15_1: 3(+1)/4\tStage-15_2: 4/4 Finished\tStage-20_3: 1(+1,-1)/3\tStage-21_1: 2/2 Finished"));
+ String[] testStrings = new String[]{
+ "STAGES ATTEMPT STATUS TOTAL COMPLETED RUNNING PENDING FAILED",
+ "Stage-1 ...... 0 RUNNING 4 3 1 0 0",
+ "Stage-3 ..... 1 RUNNING 6 4 1 1 1",
+ "Stage-9 ........ 0 FINISHED 5 5 0 0 0",
+ "Stage-10 .... 2 RUNNING 5 3 2 0 0",
+ "Stage-15 ..... 1 RUNNING 4 3 1 0 0",
+ "Stage-15 ....... 2 FINISHED 4 4 0 0 0",
+ "Stage-20 .. 3 RUNNING 3 1 1 1 1",
+ "Stage-21 ....... 1 FINISHED 2 2 0 0 0",
+ "STAGES: 03/08 [===================>>-------] 75% ELAPSED TIME:"};
+ for(String testString : testStrings) {
+ assertTrue(testOutput.contains(testString));
+ }
}
@After
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/ServiceUtils.java b/service/src/java/org/apache/hive/service/ServiceUtils.java
index 226e432..49fb5d5 100644
--- a/service/src/java/org/apache/hive/service/ServiceUtils.java
+++ b/service/src/java/org/apache/hive/service/ServiceUtils.java
@@ -69,8 +69,9 @@ public class ServiceUtils {
}
public static boolean canProvideProgressLog(HiveConf hiveConf) {
- return "tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))
- && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
+ return ("tez".equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)) || "spark"
+ .equals(hiveConf.getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) && hiveConf
+ .getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_INPLACE_PROGRESS);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
new file mode 100644
index 0000000..c2a222e
--- /dev/null
+++ b/service/src/java/org/apache/hive/service/cli/SparkProgressMonitorStatusMapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.cli;
+
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Maps status of spark stages to job execution status.
+ */
+public class SparkProgressMonitorStatusMapper implements ProgressMonitorStatusMapper {
+ /**
+ * These states are taken form DAGStatus.State, could not use that here directly as it was
+ * optional dependency and did not want to include it just for the enum.
+ */
+ enum SparkStatus {
+ PENDING, RUNNING, FINISHED
+
+ }
+
+ @Override
+ public TJobExecutionStatus forStatus(String status) {
+ if (StringUtils.isEmpty(status)) {
+ return TJobExecutionStatus.NOT_AVAILABLE;
+ }
+ SparkProgressMonitorStatusMapper.SparkStatus sparkStatus =
+ SparkProgressMonitorStatusMapper.SparkStatus.valueOf(status);
+ switch (sparkStatus) {
+ case PENDING:
+ case RUNNING:
+ return TJobExecutionStatus.IN_PROGRESS;
+ default:
+ return TJobExecutionStatus.COMPLETE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e7d1781e/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 68fe8d8..259ca63 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -21,6 +21,7 @@ package org.apache.hive.service.cli.thrift;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.SparkProgressMonitorStatusMapper;
import org.apache.hive.service.rpc.thrift.TSetClientInfoReq;
import org.apache.hive.service.rpc.thrift.TSetClientInfoResp;
@@ -707,7 +708,9 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
mapper = new TezProgressMonitorStatusMapper();
}
-
+ if ("spark".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+ mapper = new SparkProgressMonitorStatusMapper();
+ }
TJobExecutionStatus executionStatus =
mapper.forStatus(progressUpdate.status);
resp.setProgressUpdateResponse(new TProgressUpdateResp(