You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/02/21 20:48:30 UTC
hive git commit: HIVE-15847 : In Progress update refreshes seem slow
(Anishek Agarwal, via THejas Nair)
Repository: hive
Updated Branches:
refs/heads/master 7fa8e37fd -> e17a0409c
HIVE-15847 : In Progress update refreshes seem slow (Anishek Agarwal, via THejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e17a0409
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e17a0409
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e17a0409
Branch: refs/heads/master
Commit: e17a0409c920bcf75d8915047c3318d621e2ced0
Parents: 7fa8e37
Author: Anishek Agarwal <an...@gmail.com>
Authored: Tue Feb 21 12:48:19 2017 -0800
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Feb 21 12:48:25 2017 -0800
----------------------------------------------------------------------
.../hadoop/hive/common/log/InPlaceUpdate.java | 1 +
.../hive/ql/exec/tez/monitoring/DAGSummary.java | 12 +-
.../ql/exec/tez/monitoring/RenderStrategy.java | 154 +++++++++++++++++++
.../ql/exec/tez/monitoring/TezJobMonitor.java | 110 ++-----------
4 files changed, 168 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
index bfdb4fa..6db5c18 100644
--- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -163,6 +163,7 @@ public class InPlaceUpdate {
progressStr,
elapsedTime);
+ reprintLine(SEPARATOR);
reprintLineWithColorAsBold(footer, Ansi.Color.RED);
reprintLine(SEPARATOR);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 5840ad6..1400be4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -26,16 +26,11 @@ import java.util.*;
class DAGSummary implements PrintSummary {
- private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34;
- private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
-
- private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
+ private static final String FILE_HEADER_SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
+ private static final String FORMATTING_PATTERN = "%10s %17s %14s %14s %15s %16s";
private static final String FILE_HEADER = String.format(
FORMATTING_PATTERN,
"VERTICES",
- "TOTAL_TASKS",
- "FAILED_ATTEMPTS",
- "KILLED_TASKS",
"DURATION(ms)",
"CPU_TIME(ms)",
"GC_TIME(ms)",
@@ -170,9 +165,6 @@ class DAGSummary implements PrintSummary {
return String.format(FORMATTING_PATTERN,
vertexName,
- progress.getTotalTaskCount(),
- progress.getFailedTaskAttemptCount(),
- progress.getKilledTaskAttemptCount(),
secondsFormatter.format((duration)),
commaFormatter.format(cpuTimeMillis),
commaFormatter.format(gcTimeMillis),
http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
new file mode 100644
index 0000000..bb9a5e7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
@@ -0,0 +1,154 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+class RenderStrategy {
+
+ interface UpdateFunction {
+ void update(DAGStatus status, Map<String, Progress> vertexProgressMap);
+ }
+
+ private abstract static class BaseUpdateFunction implements UpdateFunction {
+ private static final int PRINT_INTERVAL = 3000;
+
+ final TezJobMonitor monitor;
+ private final PerfLogger perfLogger;
+
+ private long lastPrintTime = 0L;
+ private String lastReport = null;
+
+ BaseUpdateFunction(TezJobMonitor monitor) {
+ this.monitor = monitor;
+ perfLogger = SessionState.getPerfLogger();
+ }
+
+ @Override
+ public void update(DAGStatus status, Map<String, Progress> vertexProgressMap) {
+ renderProgress(monitor.progressMonitor(status, vertexProgressMap));
+ String report = getReport(vertexProgressMap);
+ if (showReport(report)) {
+ renderReport(report);
+ lastReport = report;
+ lastPrintTime = System.currentTimeMillis();
+ }
+ }
+
+ private boolean showReport(String report) {
+ return !report.equals(lastReport)
+ || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL;
+ }
+
+ private String getReport(Map<String, Progress> progressMap) {
+ StringWriter reportBuffer = new StringWriter();
+
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ for (String s : keys) {
+ Progress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskAttemptCount();
+ if (total <= 0) {
+ reportBuffer.append(String.format("%s: -/-\t", s));
+ } else {
+ if (complete == total) {
+ /*
+ * We may have missed the start of the vertex due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ perfLogger.PerfLogEnd(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ /* vertex is started, but not complete */
+ if (failed > 0) {
+ reportBuffer.append(
+ String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+ }
+ } else {
+ /* vertex is waiting for input/slots or complete */
+ if (failed > 0) {
+ /* tasks finished but some failed */
+ reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+ }
+ }
+ }
+ }
+
+ return reportBuffer.toString();
+ }
+
+ abstract void renderProgress(ProgressMonitor progressMonitor);
+
+ abstract void renderReport(String report);
+ }
+
+ /**
+ * this adds the required progress update to the session state that is used by HS2 to send the
+ * same information to beeline client when requested.
+ */
+ static class LogToFileFunction extends BaseUpdateFunction {
+
+ LogToFileFunction(TezJobMonitor monitor) {
+ super(monitor);
+ }
+
+ @Override
+ public void renderProgress(ProgressMonitor progressMonitor) {
+ SessionState.get().updateProgressMonitor(progressMonitor);
+ }
+
+ @Override
+ public void renderReport(String report) {
+ monitor.console.printInfo(report);
+ }
+ }
+
+ /**
+ * This used when we want the progress update to printed in the same process typically used via
+ * hive-cli mode.
+ */
+ static class InPlaceUpdateFunction extends BaseUpdateFunction {
+ /**
+ * Have to use the same instance to render else the number lines printed earlier is lost and the
+ * screen will print the table again and again.
+ */
+ private final InPlaceUpdate inPlaceUpdate;
+
+ InPlaceUpdateFunction(TezJobMonitor monitor) {
+ super(monitor);
+ inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+ }
+
+ @Override
+ public void renderProgress(ProgressMonitor progressMonitor) {
+ inPlaceUpdate.render(progressMonitor);
+ }
+
+ @Override
+ public void renderReport(String report) {
+ monitor.console.logInfo(report);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index c0a068d..f2f97f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -47,8 +47,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
@@ -59,24 +57,18 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
*/
public class TezJobMonitor {
- private static final String CLASS_NAME = TezJobMonitor.class.getName();
+ static final String CLASS_NAME = TezJobMonitor.class.getName();
private static final int CHECK_INTERVAL = 200;
private static final int MAX_RETRY_INTERVAL = 2500;
- private static final int PRINT_INTERVAL = 3000;
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final List<DAGClient> shutdownList;
private final Map<String, BaseWork> workMap;
- private transient LogHelper console;
+ transient LogHelper console;
- private long lastPrintTime;
private StringWriter diagnostics = new StringWriter();
- interface UpdateFunction {
- void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report);
- }
-
static {
shutdownList = new LinkedList<>();
ShutdownHookManager.addShutdownHook(new Runnable() {
@@ -102,12 +94,7 @@ public class TezJobMonitor {
private final DAG dag;
private final Context context;
private long executionStartTime = 0;
- private final UpdateFunction updateFunction;
- /**
- * Have to use the same instance to render else the number lines printed earlier is lost and the
- * screen will print the table again and again.
- */
- private final InPlaceUpdate inPlaceUpdate;
+ private final RenderStrategy.UpdateFunction updateFunction;
public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag,
Context ctx) {
@@ -117,29 +104,15 @@ public class TezJobMonitor {
this.dag = dag;
this.context = ctx;
console = SessionState.getConsole();
- inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream());
updateFunction = updateFunction();
}
- private UpdateFunction updateFunction() {
- UpdateFunction logToFileFunction = new UpdateFunction() {
- @Override
- public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
- SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap));
- console.printInfo(report);
- }
- };
- UpdateFunction inPlaceUpdateFunction = new UpdateFunction() {
- @Override
- public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
- inPlaceUpdate.render(progressMonitor(status, vertexProgressMap));
- console.logInfo(report);
- }
- };
+ private RenderStrategy.UpdateFunction updateFunction() {
return InPlaceUpdate.canRenderInPlace(hiveConf)
&& !SessionState.getConsole().getIsSilent()
&& !SessionState.get().isHiveServerQuery()
- ? inPlaceUpdateFunction : logToFileFunction;
+ ? new RenderStrategy.InPlaceUpdateFunction(this)
+ : new RenderStrategy.LogToFileFunction(this);
}
private boolean isProfilingEnabled() {
@@ -163,7 +136,6 @@ public class TezJobMonitor {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
DAGStatus.State lastState = null;
- String lastReport = null;
boolean running = false;
while (true) {
@@ -195,13 +167,13 @@ public class TezJobMonitor {
this.executionStartTime = System.currentTimeMillis();
running = true;
}
- lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ updateFunction.update(status, vertexProgressMap);
break;
case SUCCEEDED:
if (!running) {
this.executionStartTime = monitorStartTime;
}
- lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ updateFunction.update(status, vertexProgressMap);
success = true;
running = false;
done = true;
@@ -210,7 +182,7 @@ public class TezJobMonitor {
if (!running) {
this.executionStartTime = monitorStartTime;
}
- lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ updateFunction.update(status, vertexProgressMap);
console.printInfo("Status: Killed");
running = false;
done = true;
@@ -221,7 +193,7 @@ public class TezJobMonitor {
if (!running) {
this.executionStartTime = monitorStartTime;
}
- lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ updateFunction.update(status, vertexProgressMap);
console.printError("Status: Failed");
running = false;
done = true;
@@ -323,71 +295,11 @@ public class TezJobMonitor {
return (tezCounter == null) ? 0 : tezCounter.getValue();
}
- private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap,
- String lastReport) {
- String report = getReport(vertexProgressMap);
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) {
- updateFunction.update(status, vertexProgressMap, report);
- lastPrintTime = System.currentTimeMillis();
- }
- return report;
- }
-
- private String getReport(Map<String, Progress> progressMap) {
- StringBuilder reportBuffer = new StringBuilder();
-
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- for (String s : keys) {
- Progress progress = progressMap.get(s);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskAttemptCount();
- if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", s));
- } else {
- if (complete == total) {
- /*
- * We may have missed the start of the vertex due to the 3 seconds interval
- */
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
-
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- /* vertex is started, but not complete */
- if (failed > 0) {
- reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
- }
- } else {
- /* vertex is waiting for input/slots or complete */
- if (failed > 0) {
- /* tasks finished but some failed */
- reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
- }
- }
- }
- }
-
- return reportBuffer.toString();
- }
-
public String getDiagnostics() {
return diagnostics.toString();
}
- private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
+ ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
try {
return new TezProgressMonitor(dagClient, status, workMap, progressMap, console,
executionStartTime);