You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/02/07 20:12:37 UTC
[4/4] hive git commit: HIVE-15473: Progress Bar on Beeline client
(Anishek Agarwal via Thejas Nair)
HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e01ef32
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e01ef32
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e01ef32
Branch: refs/heads/master
Commit: 3e01ef3268ffbcb69c5c18c2c9f8810512c91bf8
Parents: f6cdbc8
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri Jan 6 14:31:21 2017 +0530
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Feb 7 12:12:27 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/hive/beeline/Commands.java | 84 +-
.../logs/BeelineInPlaceUpdateStream.java | 66 ++
common/pom.xml | 5 +
.../hadoop/hive/common/log/InPlaceUpdate.java | 202 ++++
.../hadoop/hive/common/log/ProgressMonitor.java | 51 +
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +-
.../TestOperationLoggingAPIWithMr.java | 2 +-
.../TestOperationLoggingAPIWithTez.java | 2 +-
.../org/apache/hive/jdbc/HiveStatement.java | 13 +
.../hive/jdbc/logs/InPlaceUpdateStream.java | 14 +
ql/pom.xml | 5 -
.../hadoop/hive/ql/exec/InPlaceUpdates.java | 89 --
.../hive/ql/exec/SerializationUtilities.java | 1 -
.../ql/exec/spark/status/SparkJobMonitor.java | 6 +-
.../hive/ql/exec/tez/TezJobExecHelper.java | 5 +-
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 1016 -----------------
.../hive/ql/exec/tez/TezSessionState.java | 8 +-
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 6 +-
.../hive/ql/exec/tez/monitoring/Constants.java | 7 +
.../hive/ql/exec/tez/monitoring/DAGSummary.java | 197 ++++
.../exec/tez/monitoring/FSCountersSummary.java | 92 ++
.../ql/exec/tez/monitoring/LLAPioSummary.java | 108 ++
.../ql/exec/tez/monitoring/PrintSummary.java | 7 +
.../QueryExecutionBreakdownSummary.java | 75 ++
.../ql/exec/tez/monitoring/TezJobMonitor.java | 397 +++++++
.../exec/tez/monitoring/TezProgressMonitor.java | 313 ++++++
.../apache/hadoop/hive/ql/metadata/Hive.java | 11 +-
.../hadoop/hive/ql/session/SessionState.java | 12 +
.../tez/monitoring/TestTezProgressMonitor.java | 101 ++
service-rpc/if/TCLIService.thrift | 26 +-
.../gen/thrift/gen-cpp/TCLIService_types.cpp | 322 ++++++
.../src/gen/thrift/gen-cpp/TCLIService_types.h | 102 +-
.../rpc/thrift/TGetOperationStatusReq.java | 109 +-
.../rpc/thrift/TGetOperationStatusResp.java | 116 +-
.../service/rpc/thrift/TJobExecutionStatus.java | 48 +
.../service/rpc/thrift/TProgressUpdateResp.java | 1033 ++++++++++++++++++
service-rpc/src/gen/thrift/gen-php/Types.php | 327 ++++++
.../src/gen/thrift/gen-py/TCLIService/ttypes.py | 214 +++-
.../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 51 +-
.../org/apache/hive/service/cli/CLIService.java | 63 +-
.../service/cli/EmbeddedCLIServiceClient.java | 4 +-
.../apache/hive/service/cli/ICLIService.java | 2 +-
.../hive/service/cli/JobProgressUpdate.java | 38 +
.../hive/service/cli/OperationStatus.java | 8 +
.../cli/ProgressMonitorStatusMapper.java | 19 +
.../cli/TezProgressMonitorStatusMapper.java | 32 +
.../thrift/RetryingThriftCLIServiceClient.java | 5 +-
.../service/cli/thrift/ThriftCLIService.java | 28 +-
.../cli/thrift/ThriftCLIServiceClient.java | 3 +-
.../apache/hive/service/cli/CLIServiceTest.java | 18 +-
.../cli/TestRetryingThriftCLIServiceClient.java | 2 +-
.../cli/thrift/ThriftCLIServiceTest.java | 8 +-
.../thrift/ThriftCliServiceTestWithCookie.java | 2 +-
53 files changed, 4268 insertions(+), 1214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 748546d..99db643 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.SystemVariables;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hive.beeline.logs.BeelineInPlaceUpdateStream;
import org.apache.hive.jdbc.HiveStatement;
import org.apache.hive.jdbc.Utils;
import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
@@ -982,6 +983,11 @@ public class Commands {
logThread = new Thread(createLogRunnable(stmnt));
logThread.setDaemon(true);
logThread.start();
+ if (stmnt instanceof HiveStatement) {
+ ((HiveStatement) stmnt).setInPlaceUpdateStream(
+ new BeelineInPlaceUpdateStream(beeLine.getOutputStream())
+ );
+ }
hasResults = stmnt.execute(sql);
logThread.interrupt();
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
@@ -1242,43 +1248,65 @@ public class Commands {
command.setLength(0);
}
- private Runnable createLogRunnable(Statement statement) {
+ private Runnable createLogRunnable(final Statement statement) {
if (statement instanceof HiveStatement) {
- final HiveStatement hiveStatement = (HiveStatement) statement;
-
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- while (hiveStatement.hasMoreLogs()) {
- try {
- // fetch the log periodically and output to beeline console
- for (String log : hiveStatement.getQueryLog()) {
- beeLine.info(log);
- }
- Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
- } catch (SQLException e) {
- beeLine.error(new SQLWarning(e));
- return;
- } catch (InterruptedException e) {
- beeLine.debug("Getting log thread is interrupted, since query is done!");
- showRemainingLogsIfAny(hiveStatement);
- return;
- }
- }
- }
- };
- return runnable;
+ return new LogRunnable(this, (HiveStatement) statement,
+ DEFAULT_QUERY_PROGRESS_INTERVAL);
} else {
- beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass());
+ beeLine.debug(
+ "The statement instance is not HiveStatement type: " + statement
+ .getClass());
return new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
// do nothing.
}
};
}
}
+ private void error(Throwable throwable) {
+ beeLine.error(throwable);
+ }
+
+ private void debug(String message) {
+ beeLine.debug(message);
+ }
+
+
+
+ static class LogRunnable implements Runnable {
+ private final Commands commands;
+ private final HiveStatement hiveStatement;
+ private final long queryProgressInterval;
+
+ LogRunnable(Commands commands, HiveStatement hiveStatement,
+ long queryProgressInterval) {
+ this.hiveStatement = hiveStatement;
+ this.commands = commands;
+ this.queryProgressInterval = queryProgressInterval;
+ }
+
+ private void updateQueryLog() throws SQLException {
+ for (String log : hiveStatement.getQueryLog()) {
+ commands.beeLine.info(log);
+ }
+ }
+
+ @Override public void run() {
+ while (hiveStatement.hasMoreLogs()) {
+ try {
+ updateQueryLog();
+ Thread.sleep(queryProgressInterval);
+ } catch (SQLException e) {
+ commands.error(new SQLWarning(e));
+ } catch (InterruptedException e) {
+ commands.debug("Getting log thread is interrupted, since query is done!");
+ commands.showRemainingLogsIfAny(hiveStatement);
+ }
+ }
+ }
+ }
+
private void showRemainingLogsIfAny(Statement statement) {
if (statement instanceof HiveStatement) {
HiveStatement hiveStatement = (HiveStatement) statement;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000..2ed289c
--- /dev/null
+++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
@@ -0,0 +1,66 @@
+package org.apache.hive.beeline.logs;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+import java.io.PrintStream;
+import java.util.List;
+
+public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+ private InPlaceUpdate inPlaceUpdate;
+
+ public BeelineInPlaceUpdateStream(PrintStream out) {
+ this.inPlaceUpdate = new InPlaceUpdate(out);
+ }
+
+ @Override
+ public void update(TProgressUpdateResp response) {
+ if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE))
+ return;
+
+ inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+ }
+
+ static class ProgressMonitorWrapper implements ProgressMonitor {
+ private TProgressUpdateResp response;
+
+ ProgressMonitorWrapper(TProgressUpdateResp response) {
+ this.response = response;
+ }
+
+ @Override
+ public List<String> headers() {
+ return response.getHeaderNames();
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return response.getRows();
+ }
+
+ @Override
+ public String footerSummary() {
+ return response.getFooterSummary();
+ }
+
+ @Override
+ public long startTime() {
+ return response.getStartTime();
+ }
+
+ @Override
+ public String executionStatus() {
+ throw new UnsupportedOperationException(
+ "This should never be used for anything. All the required data is available via other methods"
+ );
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return response.getProgressedPercentage();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index fd948f8..8474a87 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -64,6 +64,11 @@
<artifactId>orc-core</artifactId>
</dependency>
<dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>${jline.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<version>${jetty.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
new file mode 100644
index 0000000..bfdb4fa
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -0,0 +1,202 @@
+package org.apache.hadoop.hive.common.log;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import jline.TerminalFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.fusesource.jansi.Ansi;
+
+import javax.annotation.Nullable;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.text.DecimalFormat;
+import java.util.List;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.*;
+
+/**
+ * Renders information from ProgressMonitor to the stream provided.
+ */
+public class InPlaceUpdate {
+
+ public static final int MIN_TERMINAL_WIDTH = 94;
+
+ // keep this within 80 chars width. If more columns needs to be added then update min terminal
+ // width requirement and SEPARATOR width accordingly
+ private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s ";
+ private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s ";
+ private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
+
+ private static final int PROGRESS_BAR_CHARS = 30;
+ private static final String SEPARATOR = new String(new char[MIN_TERMINAL_WIDTH]).replace("\0", "-");
+
+ /* Pretty print the values */
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private int lines = 0;
+ private PrintStream out;
+
+ public InPlaceUpdate(PrintStream out) {
+ this.out = out;
+ }
+
+ public InPlaceUpdate() {
+ this(System.out);
+ }
+
+ public static void reprintLine(PrintStream out, String line) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ }
+
+ public static void rePositionCursor(PrintStream ps) {
+ ps.print(ansi().cursorUp(0).toString());
+ ps.flush();
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line.
+ *
+ * @param line - line to print
+ */
+ private void reprintLine(String line) {
+ reprintLine(out, line);
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line with the specified color.
+ *
+ * @param line - line to print
+ * @param color - color for the line
+ */
+ private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+ .toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given multiline. Make sure the specified line is not
+ * terminated by linebreak.
+ *
+ * @param line - line to print
+ */
+ private void reprintMultiLine(String line) {
+ int numLines = line.split("\r\n|\r|\n").length;
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines += numLines;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Repositions the cursor back to line 0.
+ */
+ private void repositionCursor() {
+ if (lines > 0) {
+ out.print(ansi().cursorUp(lines).toString());
+ out.flush();
+ lines = 0;
+ }
+ }
+
+
+ // [==================>>-----]
+ private String getInPlaceProgressBar(double percent) {
+ StringWriter bar = new StringWriter();
+ bar.append("[");
+ int remainingChars = PROGRESS_BAR_CHARS - 4;
+ int completed = (int) (remainingChars * percent);
+ int pending = remainingChars - completed;
+ for (int i = 0; i < completed; i++) {
+ bar.append("=");
+ }
+ bar.append(">>");
+ for (int i = 0; i < pending; i++) {
+ bar.append("-");
+ }
+ bar.append("]");
+ return bar.toString();
+ }
+
+ public void render(ProgressMonitor monitor) {
+ if (monitor == null) return;
+ // position the cursor to line 0
+ repositionCursor();
+
+ // print header
+ // -------------------------------------------------------------------------------
+ // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
+ // -------------------------------------------------------------------------------
+ reprintLine(SEPARATOR);
+ reprintLineWithColorAsBold(String.format(HEADER_FORMAT, monitor.headers().toArray()),
+ Ansi.Color.CYAN);
+ reprintLine(SEPARATOR);
+
+
+ // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
+ List<String> printReady = Lists.transform(monitor.rows(), new Function<List<String>, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable List<String> row) {
+ return String.format(VERTEX_FORMAT, row.toArray());
+ }
+ });
+ reprintMultiLine(StringUtils.join(printReady, "\n"));
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ String progressStr = "" + (int) (monitor.progressedPercentage() * 100) + "%";
+ float et = (float) (System.currentTimeMillis() - monitor.startTime()) / (float) 1000;
+ String elapsedTime = "ELAPSED TIME: " + secondsFormatter.format(et) + " s";
+ String footer = String.format(
+ FOOTER_FORMAT,
+ monitor.footerSummary(),
+ getInPlaceProgressBar(monitor.progressedPercentage()),
+ progressStr,
+ elapsedTime);
+
+ reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+ reprintLine(SEPARATOR);
+ }
+
+
+ public static boolean canRenderInPlace(HiveConf conf) {
+ boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+
+ // we need at least 80 chars wide terminal to display in-place updates properly
+ return inPlaceUpdates && isUnixTerminal() && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+ }
+
+ private static boolean isUnixTerminal() {
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ // we do not support Windows, we will revisit this if we really need it for windows.
+ return false;
+ }
+
+ // We must be on some unix variant..
+ // check if standard out is a terminal
+ try {
+ // isatty system call will return 1 if the file descriptor is terminal else 0
+ if (isatty(STDOUT_FILENO) == 0) {
+ return false;
+ }
+ if (isatty(STDERR_FILENO) == 0) {
+ return false;
+ }
+ } catch (NoClassDefFoundError | UnsatisfiedLinkError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
new file mode 100644
index 0000000..ee02ccb
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
@@ -0,0 +1,51 @@
+package org.apache.hadoop.hive.common.log;
+
+import java.util.Collections;
+import java.util.List;
+
+public interface ProgressMonitor {
+
+ ProgressMonitor NULL = new ProgressMonitor() {
+ @Override
+ public List<String> headers() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<String>> rows() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String footerSummary() {
+ return "";
+ }
+
+ @Override
+ public long startTime() {
+ return 0;
+ }
+
+ @Override
+ public String executionStatus() {
+ return "";
+ }
+
+ @Override
+ public double progressedPercentage() {
+ return 0;
+ }
+ };
+
+ List<String> headers();
+
+ List<List<String>> rows();
+
+ String footerSummary();
+
+ long startTime();
+
+ String executionStatus();
+
+ double progressedPercentage();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb27cd6..f3b01b2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2841,7 +2841,12 @@ public class HiveConf extends Configuration {
TEZ_EXEC_INPLACE_PROGRESS(
"hive.tez.exec.inplace.progress",
true,
- "Updates tez job execution progress in-place in the terminal."),
+ "Updates tez job execution progress in-place in the terminal when hive-cli is used."),
+ HIVE_SERVER2_INPLACE_PROGRESS(
+ "hive.server2.in.place.progress",
+ true,
+ "Allows hive server 2 to send progress bar update information. This is currently available"
+ + " only if the execution engine is tez."),
SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
"Updates spark job execution progress in-place in the terminal."),
TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
index b8462c6..830ffc2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
@@ -97,7 +97,7 @@ public class TestOperationLoggingAPIWithMr extends OperationLoggingAPITestBase {
if (System.currentTimeMillis() > pollTimeout) {
break;
}
- opStatus = client.getOperationStatus(operationHandle);
+ opStatus = client.getOperationStatus(operationHandle, false);
Assert.assertNotNull(opStatus);
state = opStatus.getState();
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
index 8b5b516..e98406d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
@@ -50,7 +50,7 @@ public class TestOperationLoggingAPIWithTez extends OperationLoggingAPITestBase
"<PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>",
"<PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>",
"<PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>",
- "from=org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor",
+ "from=org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor",
"org.apache.tez.common.counters.DAGCounter",
"NUM_SUCCEEDED_TASKS",
"TOTAL_LAUNCHED_TASKS",
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index a242501..56860c4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -19,6 +19,7 @@
package org.apache.hive.jdbc;
import org.apache.commons.codec.binary.Base64;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
import org.apache.hive.service.cli.RowSet;
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -114,6 +115,8 @@ public class HiveStatement implements java.sql.Statement {
private int queryTimeout = 0;
+ private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP;
+
public HiveStatement(HiveConnection connection, TCLIService.Iface client,
TSessionHandle sessHandle) {
this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE);
@@ -342,6 +345,7 @@ public class HiveStatement implements java.sql.Statement {
TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
+ statusReq.setGetProgressUpdate(inPlaceUpdateStream != InPlaceUpdateStream.NO_OP);
TGetOperationStatusResp statusResp = null;
// Poll on the operation status, till the operation is complete
@@ -352,6 +356,7 @@ public class HiveStatement implements java.sql.Statement {
* essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
*/
statusResp = client.GetOperationStatus(statusReq);
+ inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
Utils.verifySuccessWithInfo(statusResp.getStatus());
if (statusResp.isSetOperationState()) {
switch (statusResp.getOperationState()) {
@@ -951,4 +956,12 @@ public class HiveStatement implements java.sql.Statement {
}
return null;
}
+
+ /**
+ * This is only used by the beeline client to set the stream on which in place progress updates
+ * are to be shown
+ */
+ public void setInPlaceUpdateStream(InPlaceUpdateStream stream) {
+ this.inPlaceUpdateStream = stream;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
new file mode 100644
index 0000000..3a682b2
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
@@ -0,0 +1,14 @@
+package org.apache.hive.jdbc.logs;
+
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+public interface InPlaceUpdateStream {
+ void update(TProgressUpdateResp response);
+
+ InPlaceUpdateStream NO_OP = new InPlaceUpdateStream() {
+ @Override
+ public void update(TProgressUpdateResp response) {
+
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 84e83ee..1e6ba9a 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -463,11 +463,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- <version>${jline.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
deleted file mode 100644
index f59d8e2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
-
-import java.io.PrintStream;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
-
-import jline.TerminalFactory;
-
-public class InPlaceUpdates {
-
- public static final int MIN_TERMINAL_WIDTH = 94;
-
- static boolean isUnixTerminal() {
-
- String os = System.getProperty("os.name");
- if (os.startsWith("Windows")) {
- // we do not support Windows, we will revisit this if we really need it for windows.
- return false;
- }
-
- // We must be on some unix variant..
- // check if standard out is a terminal
- try {
- // isatty system call will return 1 if the file descriptor is terminal else 0
- if (isatty(STDOUT_FILENO) == 0) {
- return false;
- }
- if (isatty(STDERR_FILENO) == 0) {
- return false;
- }
- } catch (NoClassDefFoundError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- } catch (UnsatisfiedLinkError ignore) {
- // These errors happen if the JNI lib is not available for your platform.
- return false;
- }
- return true;
- }
-
- public static boolean inPlaceEligible(HiveConf conf) {
- String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
- boolean inPlaceUpdates = false;
- if (engine.equals("tez")) {
- inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
- }
- if (engine.equals("spark")) {
- inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
- }
-
- // we need at least 80 chars wide terminal to display in-place updates properly
- return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal()
- && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
- }
-
- public static void reprintLine(PrintStream out, String line) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
- }
-
- public static void rePositionCursor(PrintStream ps) {
- ps.print(ansi().cursorUp(0).toString());
- ps.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 7be628e..247d589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -38,7 +38,6 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index d5b9b5d..cf0162d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.ql.exec.spark.status;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.fusesource.jansi.Ansi;
@@ -82,7 +82,7 @@ abstract class SparkJobMonitor {
protected SparkJobMonitor(HiveConf hiveConf) {
monitorTimeoutInterval = hiveConf.getTimeVar(
HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
- inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf);
+ inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
console = SessionState.getConsole();
out = SessionState.LogHelper.getInfoStream();
}
@@ -270,7 +270,7 @@ abstract class SparkJobMonitor {
}
private void reprintLine(String line) {
- InPlaceUpdates.reprintLine(out, line);
+ InPlaceUpdate.reprintLine(out, line);
lines++;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
index a3fc815..a544b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
@@ -18,10 +18,11 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+
/**
* TezJobExecHelper is a utility to safely call Tez functionality from
* common code paths. It will check if tez is available/installed before
@@ -37,7 +38,7 @@ public class TezJobExecHelper {
// we have tez installed
ClassLoader classLoader = TezJobExecHelper.class.getClassLoader();
- Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor")
+ Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor")
.getMethod("killRunningJobs");
method.invoke(null, null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
deleted file mode 100644
index bd935d4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
-import static org.fusesource.jansi.Ansi.ansi;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.fusesource.jansi.Ansi;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TezJobMonitor keeps track of a tez job while it's being executed. It will
- * print status to the console and retrieve final status of the job after
- * completion.
- */
-public class TezJobMonitor {
-
- private static final String CLASS_NAME = TezJobMonitor.class.getName();
-
- private static final int COLUMN_1_WIDTH = 16;
- private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH;
- private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH + 34;
- private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-");
- private static final String FILE_HEADER_SEPARATOR =
- new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
- private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary";
- private static final String TASK_SUMMARY_HEADER = "Task Execution Summary";
- private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
- private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary";
-
- // keep this within 80 chars width. If more columns needs to be added then update min terminal
- // width requirement and SEPARATOR width accordingly
- private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s %6s ";
- private static final String VERTEX_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s %6s ";
- private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
- private static final String HEADER = String.format(HEADER_FORMAT,
- "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");
-
- // method and dag summary format
- private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s";
- private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT,
- "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
- // used when I/O redirection is used
- private static final String FILE_HEADER_FORMAT = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
- private static final String FILE_HEADER = String.format(FILE_HEADER_FORMAT,
- "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION(ms)",
- "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
- // LLAP counters
- private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
- private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
- "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
- "ALLOCATION", "USED", "TOTAL_IO");
-
- // FileSystem counters
- private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s";
-
- // Methods summary
- private static final String OPERATION_SUMMARY = "%-35s %9s";
- private static final String OPERATION = "OPERATION";
- private static final String DURATION = "DURATION";
-
- // in-place progress update related variables
- private int lines;
- private final PrintStream out;
-
- private transient LogHelper console;
- private final PerfLogger perfLogger = SessionState.getPerfLogger();
- private final int checkInterval = 200;
- private final int maxRetryInterval = 2500;
- private final int printInterval = 3000;
- private final int progressBarChars = 30;
- private long lastPrintTime;
- private Set<String> completed;
-
- /* Pretty print the values */
- private final NumberFormat secondsFormat;
- private final NumberFormat commaFormat;
- private static final List<DAGClient> shutdownList;
- private final Map<String, BaseWork> workMap;
-
- private StringBuffer diagnostics;
-
- static {
- shutdownList = new LinkedList<DAGClient>();
- ShutdownHookManager.addShutdownHook(new Runnable() {
- @Override
- public void run() {
- TezJobMonitor.killRunningJobs();
- try {
- TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
- } catch (Exception e) {
- // ignore
- }
- }
- });
- }
-
- public static void initShutdownHook() {
- Preconditions.checkNotNull(shutdownList,
- "Shutdown hook was not properly initialized");
- }
-
- public TezJobMonitor(Map<String, BaseWork> workMap) {
- this.workMap = workMap;
- console = SessionState.getConsole();
- secondsFormat = new DecimalFormat("#0.00");
- commaFormat = NumberFormat.getNumberInstance(Locale.US);
- // all progress updates are written to info stream and log file. In-place updates can only be
- // done to info stream (console)
- out = console.getInfoStream();
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given line.
- * @param line - line to print
- */
- public void reprintLine(String line) {
- InPlaceUpdates.reprintLine(out, line);
- lines++;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given line with the specified color.
- * @param line - line to print
- * @param color - color for the line
- */
- public void reprintLineWithColorAsBold(String line, Ansi.Color color) {
- out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
- .toString());
- out.flush();
- lines++;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Erases the current line and prints the given multiline. Make sure the specified line is not
- * terminated by linebreak.
- * @param line - line to print
- */
- public void reprintMultiLine(String line) {
- int numLines = line.split("\r\n|\r|\n").length;
- out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
- out.flush();
- lines += numLines;
- }
-
- /**
- * NOTE: Use this method only if isUnixTerminal is true.
- * Repositions the cursor back to line 0.
- */
- public void repositionCursor() {
- if (lines > 0) {
- out.print(ansi().cursorUp(lines).toString());
- out.flush();
- lines = 0;
- }
- }
-
- /**
- * monitorExecution handles status printing, failures during execution and final status retrieval.
- *
- * @param dagClient client that was used to kick off the job
- * @param conf configuration file for this operation
- * @return int 0 - success, 1 - killed, 2 - failed
- */
- public int monitorExecution(final DAGClient dagClient, HiveConf conf,
- DAG dag, Context ctx) throws InterruptedException {
- long monitorStartTime = System.currentTimeMillis();
- DAGStatus status = null;
- completed = new HashSet<String>();
- diagnostics = new StringBuffer();
-
- boolean running = false;
- boolean done = false;
- boolean success = false;
- int failedCounter = 0;
- int rc = 0;
- DAGStatus.State lastState = null;
- String lastReport = null;
- Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
- long startTime = 0;
- boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
- Utilities.isPerfOrAboveLogging(conf);
- boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, false);
-
- boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
- synchronized(shutdownList) {
- shutdownList.add(dagClient);
- }
- console.printInfo("\n");
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- Map<String, Progress> progressMap = null;
- while (true) {
-
- try {
- if (ctx != null) {
- ctx.checkHeartbeaterLockException();
- }
-
- status = dagClient.getDAGStatus(opts, checkInterval);
- progressMap = status.getVertexProgress();
- DAGStatus.State state = status.getState();
-
- if (state != lastState || state == RUNNING) {
- lastState = state;
-
- switch (state) {
- case SUBMITTED:
- console.printInfo("Status: Submitted");
- break;
- case INITING:
- console.printInfo("Status: Initializing");
- startTime = System.currentTimeMillis();
- break;
- case RUNNING:
- if (!running) {
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
- startTime = System.currentTimeMillis();
- running = true;
- }
-
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, false, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- } else {
- lastReport = printStatus(progressMap, lastReport, console);
- }
- break;
- case SUCCEEDED:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, false, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- } else {
- lastReport = printStatus(progressMap, lastReport, console);
- }
- success = true;
- running = false;
- done = true;
- break;
- case KILLED:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, true, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- }
- console.printInfo("Status: Killed");
- running = false;
- done = true;
- rc = 1;
- break;
- case FAILED:
- case ERROR:
- if (!running) {
- startTime = monitorStartTime;
- }
- if (inPlaceEligible) {
- printStatusInPlace(progressMap, startTime, true, dagClient);
- // log the progress report to log file as well
- lastReport = logStatus(progressMap, lastReport, console);
- }
- console.printError("Status: Failed");
- running = false;
- done = true;
- rc = 2;
- break;
- }
- }
- } catch (Exception e) {
- console.printInfo("Exception: " + e.getMessage());
- boolean isInterrupted = hasInterruptedException(e);
- if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
- try {
- console.printInfo("Killing DAG...");
- dagClient.tryKillDAG();
- } catch (IOException io) {
- // best effort
- } catch (TezException te) {
- // best effort
- }
- e.printStackTrace();
- console.printError("Execution has failed.");
- rc = 1;
- done = true;
- } else {
- console.printInfo("Retrying...");
- }
- } finally {
- if (done) {
- if (rc != 0 && status != null) {
- for (String diag : status.getDiagnostics()) {
- console.printError(diag);
- diagnostics.append(diag);
- }
- }
- synchronized(shutdownList) {
- shutdownList.remove(dagClient);
- }
- break;
- }
- }
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
-
- if (isProfileEnabled && success && progressMap != null) {
-
- double duration = (System.currentTimeMillis() - startTime) / 1000.0;
- console.printInfo("Status: DAG finished successfully in "
- + String.format("%.2f seconds", duration));
- console.printInfo("");
-
- console.printInfo(QUERY_EXEC_SUMMARY_HEADER);
- printQueryExecutionBreakDown();
- console.printInfo(SEPARATOR);
- console.printInfo("");
-
- console.printInfo(TASK_SUMMARY_HEADER);
- printDagSummary(progressMap, console, dagClient, conf, dag, inPlaceEligible);
- if (inPlaceEligible) {
- console.printInfo(SEPARATOR);
- } else {
- console.printInfo(FILE_HEADER_SEPARATOR);
- }
-
- if (llapIoEnabled) {
- console.printInfo("");
- console.printInfo(LLAP_IO_SUMMARY_HEADER);
- printLlapIOSummary(progressMap, console, dagClient);
- console.printInfo(SEPARATOR);
- console.printInfo("");
-
- console.printInfo(FS_COUNTERS_SUMMARY_HEADER);
- printFSCountersSummary(progressMap, console, dagClient);
- }
-
- console.printInfo("");
- }
-
- return rc;
- }
-
- private static boolean hasInterruptedException(Throwable e) {
- // Hadoop IPC wraps InterruptedException. GRRR.
- while (e != null) {
- if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
- return true;
- }
- e = e.getCause();
- }
- return false;
- }
-
- /**
- * killRunningJobs tries to terminate execution of all
- * currently running tez queries. No guarantees, best effort only.
- */
- public static void killRunningJobs() {
- synchronized (shutdownList) {
- for (DAGClient c : shutdownList) {
- try {
- System.err.println("Trying to shutdown DAG");
- c.tryKillDAG();
- } catch (Exception e) {
- // ignore
- }
- }
- }
- }
-
- private static long getCounterValueByGroupName(TezCounters vertexCounters,
- String groupNamePattern,
- String counterName) {
- TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
- return (tezCounter == null) ? 0 : tezCounter.getValue();
- }
-
- private void printQueryExecutionBreakDown() {
-
- /* Build the method summary header */
- String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(execBreakdownHeader, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
-
- // parse, analyze, optimize and compile
- long compile = perfLogger.getEndTime(PerfLogger.COMPILE) -
- perfLogger.getStartTime(PerfLogger.COMPILE);
- console.printInfo(String.format(OPERATION_SUMMARY, "Compile Query",
- secondsFormat.format(compile / 1000.0) + "s"));
-
- // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
- long totalDAGPrep = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG) -
- perfLogger.getEndTime(PerfLogger.COMPILE);
- console.printInfo(String.format(OPERATION_SUMMARY, "Prepare Plan",
- secondsFormat.format(totalDAGPrep / 1000.0) + "s"));
-
- // submit to accept dag (if session is closed, this will include re-opening of session time,
- // localizing files for AM, submitting DAG)
- long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) -
- perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
- console.printInfo(String.format(OPERATION_SUMMARY, "Submit Plan",
- secondsFormat.format(submitToAccept / 1000.0) + "s"));
-
- // accept to start dag (schedule wait time, resource wait time etc.)
- long acceptToStart = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- console.printInfo(String.format(OPERATION_SUMMARY, "Start DAG",
- secondsFormat.format(acceptToStart / 1000.0) + "s"));
-
- // time to actually run the dag (actual dag runtime)
- final long startToEnd;
- if (acceptToStart == 0) {
- startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
- } else {
- startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
- perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- }
- console.printInfo(String.format(OPERATION_SUMMARY, "Run DAG",
- secondsFormat.format(startToEnd / 1000.0) + "s"));
-
- }
-
- private void printDagSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient, HiveConf conf, DAG dag, final boolean inPlaceEligible) {
-
- /* Strings for headers and counters */
- String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
- Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- TezCounters hiveCounters = null;
- try {
- hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
-
- /* If the counters are missing there is no point trying to print progress */
- if (hiveCounters == null) {
- return;
- }
-
- /* Print the per Vertex summary */
- if (inPlaceEligible) {
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
- } else {
- console.printInfo(FILE_HEADER_SEPARATOR);
- reprintLineWithColorAsBold(FILE_HEADER, Ansi.Color.CYAN);
- console.printInfo(FILE_HEADER_SEPARATOR);
- }
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- for (String vertexName : keys) {
- Progress progress = progressMap.get(vertexName);
- if (progress != null) {
- final int totalTasks = progress.getTotalTaskCount();
- final int failedTaskAttempts = progress.getFailedTaskAttemptCount();
- final int killedTaskAttempts = progress.getKilledTaskAttemptCount();
- final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
- VertexStatus vertexStatus = null;
- try {
- vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
-
- if (vertexStatus == null) {
- continue;
- }
-
- Vertex currentVertex = dag.getVertex(vertexName);
- List<Vertex> inputVerticesList = currentVertex.getInputVertices();
- long hiveInputRecordsFromOtherVertices = 0;
- if (inputVerticesList.size() > 0) {
-
- for (Vertex inputVertex : inputVerticesList) {
- String inputVertexName = inputVertex.getName();
- hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
- hiveCountersGroup, String.format("%s_",
- ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) +
- inputVertexName.replace(" ", "_"));
-
- hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
- hiveCountersGroup, String.format("%s_",
- FileSinkOperator.Counter.RECORDS_OUT.toString()) +
- inputVertexName.replace(" ", "_"));
- }
- }
-
- /*
- * Get the CPU & GC
- *
- * counters org.apache.tez.common.counters.TaskCounter
- * GC_TIME_MILLIS=37712
- * CPU_MILLISECONDS=2774230
- */
- final TezCounters vertexCounters = vertexStatus.getVertexCounters();
- final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
- TaskCounter.class.getName(),
- TaskCounter.CPU_MILLISECONDS.name());
-
- final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
- TaskCounter.class.getName(),
- TaskCounter.GC_TIME_MILLIS.name());
-
- /*
- * Get the HIVE counters
- *
- * HIVE
- * CREATED_FILES=1
- * DESERIALIZE_ERRORS=0
- * RECORDS_IN_Map_1=550076554
- * RECORDS_OUT_INTERMEDIATE_Map_1=854987
- * RECORDS_OUT_Reducer_2=1
- */
-
- final long hiveInputRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", MapOperator.Counter.RECORDS_IN.toString())
- + vertexName.replace(" ", "_"))
- + hiveInputRecordsFromOtherVertices;
- final long hiveOutputIntermediateRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString())
- + vertexName.replace(" ", "_"));
- final long hiveOutputRecords =
- getCounterValueByGroupName(
- hiveCounters,
- hiveCountersGroup,
- String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString())
- + vertexName.replace(" ", "_"))
- + hiveOutputIntermediateRecords;
-
- final String vertexExecutionStats;
- if (inPlaceEligible) {
- vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT,
- vertexName,
- secondsFormat.format((duration)),
- commaFormat.format(cpuTimeMillis),
- commaFormat.format(gcTimeMillis),
- commaFormat.format(hiveInputRecords),
- commaFormat.format(hiveOutputRecords));
- } else {
- vertexExecutionStats = String.format(FILE_HEADER_FORMAT,
- vertexName,
- totalTasks,
- failedTaskAttempts,
- killedTaskAttempts,
- secondsFormat.format((duration)),
- commaFormat.format(cpuTimeMillis),
- commaFormat.format(gcTimeMillis),
- commaFormat.format(hiveInputRecords),
- commaFormat.format(hiveOutputRecords));
- }
- console.printInfo(vertexExecutionStats);
- }
- }
- }
-
- private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient) {
- SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- boolean first = false;
- String counterGroup = LlapIOCounters.class.getName();
- for (String vertexName : keys) {
- // Reducers do not benefit from LLAP IO so no point in printing
- if (vertexName.startsWith("Reducer")) {
- continue;
- }
- TezCounters vertexCounters = null;
- try {
- vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
- .getVertexCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexCounters != null) {
- final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
- final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
- final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
- final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
- final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
- final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
- final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
- final long totalIoTime = getCounterValueByGroupName(vertexCounters,
- counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
-
- if (!first) {
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
- first = true;
- }
-
- String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT,
- vertexName,
- selectedRowgroups,
- metadataCacheHit,
- metadataCacheMiss,
- Utilities.humanReadableByteCount(cacheHitBytes),
- Utilities.humanReadableByteCount(cacheMissBytes),
- Utilities.humanReadableByteCount(allocatedBytes),
- Utilities.humanReadableByteCount(allocatedUsedBytes),
- secondsFormat.format(totalIoTime / 1000_000_000.0) + "s");
- console.printInfo(queryFragmentStats);
- }
- }
- }
-
- private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console,
- DAGClient dagClient) {
- SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
- Set<StatusGetOpts> statusOptions = new HashSet<>(1);
- statusOptions.add(StatusGetOpts.GET_COUNTERS);
- // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
- // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
- for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
- final String scheme = statistics.getScheme().toUpperCase();
- final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT,
- "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
-
- console.printInfo("");
- reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED);
- console.printInfo(SEPARATOR);
- reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN);
- console.printInfo(SEPARATOR);
-
- for (String vertexName : keys) {
- TezCounters vertexCounters = null;
- try {
- vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
- .getVertexCounters();
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexCounters != null) {
- final String counterGroup = FileSystemCounter.class.getName();
- final long bytesRead = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
- final long bytesWritten = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
- final long readOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
- final long largeReadOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
- final long writeOps = getCounterValueByGroupName(vertexCounters,
- counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
-
- String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT,
- vertexName,
- Utilities.humanReadableByteCount(bytesRead),
- readOps,
- largeReadOps,
- Utilities.humanReadableByteCount(bytesWritten),
- writeOps);
- console.printInfo(fsCountersSummary);
- }
- }
-
- console.printInfo(SEPARATOR);
- }
- }
-
- private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
- boolean vextexStatusFromAM, DAGClient dagClient) {
- StringBuilder reportBuffer = new StringBuilder();
- int sumComplete = 0;
- int sumTotal = 0;
-
- // position the cursor to line 0
- repositionCursor();
-
- // print header
- // -------------------------------------------------------------------------------
- // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
- // -------------------------------------------------------------------------------
- reprintLine(SEPARATOR);
- reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
- reprintLine(SEPARATOR);
-
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- int idx = 0;
- int maxKeys = keys.size();
- for (String s : keys) {
- idx++;
- Progress progress = progressMap.get(s);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskAttemptCount();
- final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() -
- progress.getRunningTaskCount();
- final int killed = progress.getKilledTaskAttemptCount();
-
- // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
- // get status from AM for every refresh of the UI. Lets infer the state from task counts.
- // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
- VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
-
- // INITED state
- if (total > 0) {
- vertexState = VertexStatus.State.INITED;
- sumComplete += complete;
- sumTotal += total;
- }
-
- // RUNNING state
- if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
- vertexState = VertexStatus.State.RUNNING;
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- }
-
- // SUCCEEDED state
- if (complete == total) {
- vertexState = VertexStatus.State.SUCCEEDED;
- if (!completed.contains(s)) {
- completed.add(s);
-
- /* We may have missed the start of the vertex
- * due to the 3 seconds interval
- */
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- }
-
- // DAG might have been killed, lets try to get vertex state from AM before dying
- // KILLED or FAILED state
- if (vextexStatusFromAM) {
- VertexStatus vertexStatus = null;
- try {
- vertexStatus = dagClient.getVertexStatus(s, null);
- } catch (IOException e) {
- // best attempt, shouldn't really kill DAG for this
- } catch (TezException e) {
- // best attempt, shouldn't really kill DAG for this
- }
- if (vertexStatus != null) {
- vertexState = vertexStatus.getState();
- }
- }
-
- // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
- String nameWithProgress = getNameWithProgress(s, complete, total);
- String mode = getMode(s, workMap);
- String vertexStr = String.format(VERTEX_FORMAT,
- nameWithProgress,
- mode,
- vertexState.toString(),
- total,
- complete,
- running,
- pending,
- failed,
- killed);
- reportBuffer.append(vertexStr);
- if (idx != maxKeys) {
- reportBuffer.append("\n");
- }
- }
-
- reprintMultiLine(reportBuffer.toString());
-
- // -------------------------------------------------------------------------------
- // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
- // -------------------------------------------------------------------------------
- reprintLine(SEPARATOR);
- final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
- String footer = getFooter(keys.size(), completed.size(), progress, startTime);
- reprintLineWithColorAsBold(footer, Ansi.Color.RED);
- reprintLine(SEPARATOR);
- }
-
- private String getMode(String name, Map<String, BaseWork> workMap) {
- String mode = "container";
- BaseWork work = workMap.get(name);
- if (work != null) {
- // uber > llap > container
- if (work.getUberMode()) {
- mode = "uber";
- } else if (work.getLlapMode()) {
- mode = "llap";
- } else {
- mode = "container";
- }
- }
- return mode;
- }
-
- // Map 1 ..........
- private String getNameWithProgress(String s, int complete, int total) {
- String result = "";
- if (s != null) {
- float percent = total == 0 ? 0.0f : (float) complete / (float) total;
- // lets use the remaining space in column 1 as progress bar
- int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
- String trimmedVName = s;
-
- // if the vertex name is longer than column 1 width, trim it down
- // "Tez Merge File Work" will become "Tez Merge File.."
- if (s.length() > COLUMN_1_WIDTH) {
- trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
- trimmedVName = trimmedVName + "..";
- }
-
- result = trimmedVName + " ";
- int toFill = (int) (spaceRemaining * percent);
- for (int i = 0; i < toFill; i++) {
- result += ".";
- }
- }
- return result;
- }
-
- // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s
- private String getFooter(int keySize, int completedSize, float progress, long startTime) {
- String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize);
- String progressBar = getInPlaceProgressBar(progress);
- final int progressPercent = (int) (progress * 100);
- String progressStr = "" + progressPercent + "%";
- float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
- String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
- String footer = String.format(FOOTER_FORMAT,
- verticesSummary, progressBar, progressStr, elapsedTime);
- return footer;
- }
-
- // [==================>>-----]
- private String getInPlaceProgressBar(float percent) {
- StringBuilder bar = new StringBuilder("[");
- int remainingChars = progressBarChars - 4;
- int completed = (int) (remainingChars * percent);
- int pending = remainingChars - completed;
- for (int i = 0; i < completed; i++) {
- bar.append("=");
- }
- bar.append(">>");
- for (int i = 0; i < pending; i++) {
- bar.append("-");
- }
- bar.append("]");
- return bar.toString();
- }
-
- private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
- String report = getReport(progressMap);
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.printInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
- return report;
- }
-
- private String logStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
- String report = getReport(progressMap);
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.logInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
- return report;
- }
-
- private String getReport(Map<String, Progress> progressMap) {
- StringBuilder reportBuffer = new StringBuilder();
-
- SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
- for (String s: keys) {
- Progress progress = progressMap.get(s);
- final int complete = progress.getSucceededTaskCount();
- final int total = progress.getTotalTaskCount();
- final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskAttemptCount();
- if (total <= 0) {
- reportBuffer.append(String.format("%s: -/-\t", s));
- } else {
- if (complete == total && !completed.contains(s)) {
- completed.add(s);
-
- /*
- * We may have missed the start of the vertex due to the 3 seconds interval
- */
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
- if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
-
- if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
-
- /* vertex is started, but not complete */
- if (failed > 0) {
- reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
- }
- } else {
- /* vertex is waiting for input/slots or complete */
- if (failed > 0) {
- /* tasks finished but some failed */
- reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
- } else {
- reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
- }
- }
- }
- }
-
- return reportBuffer.toString();
- }
-
- public String getDiagnostics() {
- return diagnostics.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index f1071fa..62f65c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-
import java.util.Collection;
-
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -40,9 +38,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
-
import javax.security.auth.login.LoginException;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -83,6 +79,7 @@ import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
/**
* Holds session state related to Tez
@@ -671,7 +668,7 @@ public class TezSessionState {
}
public List<LocalResource> getLocalizedResources() {
- return new ArrayList<LocalResource>(localizedResources);
+ return new ArrayList<>(localizedResources);
}
public String getUser() {
@@ -698,4 +695,5 @@ public class TezSessionState {
}
} while (!ownerThread.compareAndSet(null, newName));
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 7479b85..69cbe0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -79,6 +79,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.json.JSONObject;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
/**
*
@@ -178,8 +179,9 @@ public class TezTask extends Task<TezWork> {
additionalLr, inputOutputJars, inputOutputLocalResources);
// finally monitor will print progress until the job is done
- TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
+ TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap(),dagClient, conf, dag, ctx);
+ rc = monitor.monitorExecution();
+
if (rc != 0) {
this.setException(new HiveException(monitor.getDiagnostics()));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
new file mode 100644
index 0000000..eccbbb6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+
+public interface Constants {
+ String SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
new file mode 100644
index 0000000..5840ad6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.*;
+
+
+class DAGSummary implements PrintSummary {
+
+ private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34;
+ private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
+
+ private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
+ private static final String FILE_HEADER = String.format(
+ FORMATTING_PATTERN,
+ "VERTICES",
+ "TOTAL_TASKS",
+ "FAILED_ATTEMPTS",
+ "KILLED_TASKS",
+ "DURATION(ms)",
+ "CPU_TIME(ms)",
+ "GC_TIME(ms)",
+ "INPUT_RECORDS",
+ "OUTPUT_RECORDS"
+ );
+
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private final NumberFormat commaFormatter = NumberFormat.getNumberInstance(Locale.US);
+
+ private final String hiveCountersGroup;
+ private final TezCounters hiveCounters;
+
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+ private DAG dag;
+ private PerfLogger perfLogger;
+
+ DAGSummary(Map<String, Progress> progressMap, HiveConf hiveConf, DAGClient dagClient,
+ DAG dag, PerfLogger perfLogger) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ this.dag = dag;
+ this.perfLogger = perfLogger;
+ this.hiveCountersGroup = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ this.hiveCounters = hiveCounters(dagClient);
+ }
+
+ private long hiveInputRecordsFromOtherVertices(String vertexName) {
+ List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
+ long result = 0;
+ for (Vertex inputVertex : inputVerticesList) {
+ String intermediateRecordsCounterName = formattedName(
+ ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+ inputVertex.getName()
+ );
+ String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+ inputVertex.getName());
+ result += (
+ hiveCounterValue(intermediateRecordsCounterName)
+ + hiveCounterValue(recordsOutCounterName)
+ );
+ }
+ return result;
+ }
+
+ private String formattedName(String counterName, String vertexName) {
+ return String.format("%s_", counterName) + vertexName.replace(" ", "_");
+ }
+
+ private long getCounterValueByGroupName(TezCounters counters, String pattern, String counterName) {
+ TezCounter tezCounter = counters.getGroup(pattern).findCounter(counterName);
+ return (tezCounter == null) ? 0 : tezCounter.getValue();
+ }
+
+ private long hiveCounterValue(String counterName) {
+ return getCounterValueByGroupName(hiveCounters, hiveCountersGroup, counterName);
+ }
+
+ private TezCounters hiveCounters(DAGClient dagClient) {
+ try {
+ return dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS)).getDAGCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("Task Execution Summary");
+
+ /* If the counters are missing there is no point trying to print progress */
+ if (hiveCounters == null) {
+ return;
+ }
+
+ /* Print the per Vertex summary */
+ printHeader(console);
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ for (String vertexName : keys) {
+ Progress progress = progressMap.get(vertexName);
+ if (progress == null) continue;
+
+ VertexStatus vertexStatus = vertexStatus(statusOptions, vertexName);
+ if (vertexStatus == null) {
+ continue;
+ }
+ console.printInfo(vertexSummary(vertexName, progress, vertexStatus));
+ }
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ }
+
+ private String vertexSummary(String vertexName, Progress progress, VertexStatus vertexStatus) {
+ /*
+ * Get the CPU & GC
+ *
+ * counters org.apache.tez.common.counters.TaskCounter
+ * GC_TIME_MILLIS=37712
+ * CPU_MILLISECONDS=2774230
+ */
+ final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+ final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.CPU_MILLISECONDS.name());
+
+ final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.GC_TIME_MILLIS.name());
+
+ /*
+ * Get the HIVE counters
+ *
+ * HIVE
+ * CREATED_FILES=1
+ * DESERIALIZE_ERRORS=0
+ * RECORDS_IN_Map_1=550076554
+ * RECORDS_OUT_INTERMEDIATE_Map_1=854987
+ * RECORDS_OUT_Reducer_2=1
+ */
+ final long hiveInputRecords =
+ hiveCounterValue(formattedName(MapOperator.Counter.RECORDS_IN.toString(), vertexName))
+ + hiveInputRecordsFromOtherVertices(vertexName);
+
+ final long hiveOutputRecords =
+ hiveCounterValue(formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), vertexName)) +
+ hiveCounterValue(formattedName(ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), vertexName));
+
+ final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
+
+ return String.format(FORMATTING_PATTERN,
+ vertexName,
+ progress.getTotalTaskCount(),
+ progress.getFailedTaskAttemptCount(),
+ progress.getKilledTaskAttemptCount(),
+ secondsFormatter.format((duration)),
+ commaFormatter.format(cpuTimeMillis),
+ commaFormatter.format(gcTimeMillis),
+ commaFormatter.format(hiveInputRecords),
+ commaFormatter.format(hiveOutputRecords));
+ }
+
+ private VertexStatus vertexStatus(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions);
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+ private void printHeader(SessionState.LogHelper console) {
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ console.printInfo(FILE_HEADER);
+ console.printInfo(FILE_HEADER_SEPARATOR);
+ }
+}