You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/06/18 15:08:28 UTC
hive git commit: HIVE-19602: Refactor inplace progress code in
Hive-on-spark progress monitor to use ProgressMonitor instance
(Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar, Rui Li)
Repository: hive
Updated Branches:
refs/heads/master 3a6ad2661 -> c89cf6d5d
HIVE-19602: Refactor inplace progress code in Hive-on-spark progress monitor to use ProgressMonitor instance (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar, Rui Li)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c89cf6d5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c89cf6d5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c89cf6d5
Branch: refs/heads/master
Commit: c89cf6d5de0343493dc629a0073b5c8e88359a6e
Parents: 3a6ad26
Author: Bharathkrishna Guruvayoor Murali <bh...@cloudera.com>
Authored: Mon Jun 18 10:03:01 2018 -0500
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jun 18 10:03:01 2018 -0500
----------------------------------------------------------------------
.../ql/exec/spark/status/SparkJobMonitor.java | 166 +------------------
.../exec/spark/status/SparkProgressMonitor.java | 155 +++++++++++++++++
2 files changed, 160 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index e78b1cd..3531ac2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -22,13 +22,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
@@ -38,8 +34,6 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-import static org.fusesource.jansi.Ansi.ansi;
-
abstract class SparkJobMonitor {
protected static final String CLASS_NAME = SparkJobMonitor.class.getName();
@@ -48,6 +42,7 @@ abstract class SparkJobMonitor {
protected final PerfLogger perfLogger = SessionState.getPerfLogger();
protected final int checkInterval = 1000;
protected final long monitorTimeoutInterval;
+ private final InPlaceUpdate inPlaceUpdateFn;
private final Set<String> completed = new HashSet<String>();
private final int printInterval = 3000;
@@ -61,94 +56,20 @@ abstract class SparkJobMonitor {
FINISHED
}
- // in-place progress update related variables
protected final boolean inPlaceUpdate;
- private int lines = 0;
- private final PrintStream out;
-
- private static final int COLUMN_1_WIDTH = 16;
- private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s ";
- private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s ";
- private static final String HEADER = String.format(HEADER_FORMAT,
- "STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED");
- private static final int SEPARATOR_WIDTH = 86;
- private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-");
- private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
- private static final int progressBarChars = 30;
-
- private final NumberFormat secondsFormat = new DecimalFormat("#0.00");
protected SparkJobMonitor(HiveConf hiveConf) {
monitorTimeoutInterval = hiveConf.getTimeVar(
HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
console = new SessionState.LogHelper(LOG);
- out = SessionState.LogHelper.getInfoStream();
+ inPlaceUpdateFn = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
}
public abstract int startMonitor();
private void printStatusInPlace(Map<SparkStage, SparkStageProgress> progressMap) {
-
- StringBuilder reportBuffer = new StringBuilder();
-
- // Num of total and completed tasks
- int sumTotal = 0;
- int sumComplete = 0;
-
- // position the cursor to line 0
- repositionCursor();
-
- // header
- reprintLine(SEPARATOR);
- reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
- reprintLine(SEPARATOR);
-
- SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
- int idx = 0;
- final int numKey = keys.size();
- for (SparkStage stage : keys) {
- SparkStageProgress progress = progressMap.get(stage);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskCount();
- sumTotal += total;
- sumComplete += complete;
-
- String s = stage.toString();
- StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED;
- if (complete > 0 || running > 0 || failed > 0) {
- if (!perfLogger.startTimeHasMethod(PerfLogger.SPARK_RUN_STAGE + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
- }
- if (complete < total) {
- state = StageState.RUNNING;
- } else {
- state = StageState.FINISHED;
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_STAGE + s);
- completed.add(s);
- }
- }
-
- String attempt = String.valueOf(stage.getAttemptId());
- String stageName = "Stage-" + String.valueOf(stage.getStageId());
- String nameWithProgress = getNameWithProgress(stageName, complete, total);
-
- final int pending = total - complete - running;
- String stageStr = String.format(STAGE_FORMAT,
- nameWithProgress, attempt, state, total, complete, running, pending, failed);
- reportBuffer.append(stageStr);
- if (idx++ != numKey - 1) {
- reportBuffer.append("\n");
- }
- }
- reprintMultiLine(reportBuffer.toString());
- reprintLine(SEPARATOR);
- final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
- String footer = getFooter(numKey, completed.size(), progress, startTime);
- reprintLineWithColorAsBold(footer, Ansi.Color.RED);
- reprintLine(SEPARATOR);
+ inPlaceUpdateFn.render(getProgressMonitor(progressMap));
}
protected void printStatus(Map<SparkStage, SparkStageProgress> progressMap,
@@ -293,84 +214,7 @@ abstract class SparkJobMonitor {
return true;
}
- private void repositionCursor() {
- if (lines > 0) {
- out.print(ansi().cursorUp(lines).toString());
- out.flush();
- lines = 0;
- }
- }
-
- private void reprintLine(String line) {
- InPlaceUpdate.reprintLine(out, line);
- lines++;
- }
-
- private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
- .toString());
- out.flush();
- lines++;
- }
-
- private String getNameWithProgress(String s, int complete, int total) {
- String result = "";
- if (s != null) {
- float percent = total == 0 ? 1.0f : (float) complete / (float) total;
- // lets use the remaining space in column 1 as progress bar
- int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
- String trimmedVName = s;
-
- // if the vertex name is longer than column 1 width, trim it down
- if (s.length() > COLUMN_1_WIDTH) {
- trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2);
- result = trimmedVName + "..";
- } else {
- result = trimmedVName + " ";
- }
-
- int toFill = (int) (spaceRemaining * percent);
- for (int i = 0; i < toFill; i++) {
- result += ".";
- }
- }
- return result;
- }
-
- // STAGES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s
- private String getFooter(int keySize, int completedSize, float progress, long startTime) {
- String verticesSummary = String.format("STAGES: %02d/%02d", completedSize, keySize);
- String progressBar = getInPlaceProgressBar(progress);
- final int progressPercent = (int) (progress * 100);
- String progressStr = "" + progressPercent + "%";
- float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
- String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
- String footer = String.format(FOOTER_FORMAT,
- verticesSummary, progressBar, progressStr, elapsedTime);
- return footer;
- }
-
- // [==================>>-----]
- private String getInPlaceProgressBar(float percent) {
- StringBuilder bar = new StringBuilder("[");
- int remainingChars = progressBarChars - 4;
- int completed = (int) (remainingChars * percent);
- int pending = remainingChars - completed;
- for (int i = 0; i < completed; i++) {
- bar.append("=");
- }
- bar.append(">>");
- for (int i = 0; i < pending; i++) {
- bar.append("-");
- }
- bar.append("]");
- return bar.toString();
- }
-
- private void reprintMultiLine(String line) {
- int numLines = line.split("\r\n|\r|\n").length;
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
- lines += numLines;
+ private SparkProgressMonitor getProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap) {
+ return new SparkProgressMonitor(progressMap, startTime);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c89cf6d5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
new file mode 100644
index 0000000..0c33db0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgressMonitor.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.spark.status;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * This class defines various parts of the progress update bar.
+ * Progressbar is displayed in hive-cli and typically rendered using InPlaceUpdate.
+ */
+class SparkProgressMonitor implements ProgressMonitor {
+
+ private Map<SparkStage, SparkStageProgress> progressMap;
+ private long startTime;
+ private static final int COLUMN_1_WIDTH = 16;
+
+ SparkProgressMonitor(Map<SparkStage, SparkStageProgress> progressMap, long startTime) {
+ this.progressMap = progressMap;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public List<String> headers() {
+ return Arrays.asList("STAGES", "ATTEMPT", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "");
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ List<List<String>> progressRows = new ArrayList<>();
+ SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+ for (SparkStage stage : keys) {
+ SparkStageProgress progress = progressMap.get(stage);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskCount();
+
+ SparkJobMonitor.StageState state =
+ total > 0 ? SparkJobMonitor.StageState.PENDING : SparkJobMonitor.StageState.FINISHED;
+ if (complete > 0 || running > 0 || failed > 0) {
+ if (complete < total) {
+ state = SparkJobMonitor.StageState.RUNNING;
+ } else {
+ state = SparkJobMonitor.StageState.FINISHED;
+ }
+ }
+ String attempt = String.valueOf(stage.getAttemptId());
+ String stageName = "Stage-" +String.valueOf(stage.getStageId());
+ String nameWithProgress = getNameWithProgress(stageName, complete, total);
+ final int pending = total - complete - running;
+
+ progressRows.add(Arrays
+ .asList(nameWithProgress, attempt, state.toString(), String.valueOf(total), String.valueOf(complete),
+ String.valueOf(running), String.valueOf(pending), String.valueOf(failed), ""));
+ }
+ return progressRows;
+ }
+
+ @Override
+ public String footerSummary() {
+ return String.format("STAGES: %02d/%02d", getCompletedStages(), progressMap.keySet().size());
+ }
+
+ @Override
+ public long startTime() {
+ return startTime;
+ }
+
+ @Override
+ public String executionStatus() {
+ if (getCompletedStages() == progressMap.keySet().size()) {
+ return SparkJobMonitor.StageState.FINISHED.toString();
+ } else {
+ return SparkJobMonitor.StageState.RUNNING.toString();
+ }
+ }
+
+ @Override
+ public double progressedPercentage() {
+
+ SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+ int sumTotal = 0;
+ int sumComplete = 0;
+ for (SparkStage stage : keys) {
+ SparkStageProgress progress = progressMap.get(stage);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ sumTotal += total;
+ sumComplete += complete;
+ }
+ double progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal;
+ return progress;
+ }
+
+ private int getCompletedStages() {
+ int completed = 0;
+ SortedSet<SparkStage> keys = new TreeSet<SparkStage>(progressMap.keySet());
+ for (SparkStage stage : keys) {
+ SparkStageProgress progress = progressMap.get(stage);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ if (total > 0 && complete == total) {
+ completed++;
+ }
+ }
+ return completed;
+ }
+
+ private String getNameWithProgress(String s, int complete, int total) {
+
+ if (s == null) {
+ return "";
+ }
+ float percent = total == 0 ? 1.0f : (float) complete / (float) total;
+ // lets use the remaining space in column 1 as progress bar
+ int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+ String trimmedVName = s;
+
+ // if the vertex name is longer than column 1 width, trim it down
+ if (s.length() > COLUMN_1_WIDTH) {
+ trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2);
+ trimmedVName += "..";
+ } else {
+ trimmedVName += " ";
+ }
+ StringBuilder result = new StringBuilder(trimmedVName);
+ int toFill = (int) (spaceRemaining * percent);
+ for (int i = 0; i < toFill; i++) {
+ result.append(".");
+ }
+ return result.toString();
+ }
+}