You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/11/03 03:21:02 UTC
svn commit: r1636230 - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/log/
ql/src/java/org/apache/hadoop/hive/ql/session/
Author: gunther
Date: Mon Nov 3 02:21:01 2014
New Revision: 1636230
URL: http://svn.apache.org/r1636230
Log:
HIVE-8495: Add progress bar for Hive on Tez queries (Prasanth J and Mostafa Mokhtar via Gunther Hagleitner)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/pom.xml
hive/trunk/ql/pom.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Nov 3 02:21:01 2014
@@ -1916,7 +1916,15 @@ public class HiveConf extends Configurat
TEZ_SMB_NUMBER_WAVES(
"hive.tez.smb.number.waves",
(float) 0.5,
- "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.")
+ "The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave."),
+ TEZ_EXEC_SUMMARY(
+ "hive.tez.exec.print.summary",
+ false,
+ "Display breakdown of execution steps, for every query executed by the shell."),
+ TEZ_EXEC_INPLACE_PROGRESS(
+ "hive.tez.exec.inplace.progress",
+ true,
+ "Updates tez job execution progress in-place in the terminal.")
;
public final String varname;
Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Mon Nov 3 02:21:01 2014
@@ -129,6 +129,7 @@
<jetty.version>7.6.0.v20120127</jetty.version>
<jersey.version>1.14</jersey.version>
<jline.version>0.9.94</jline.version>
+ <jansi.version>1.11</jansi.version>
<jms.version>1.1</jms.version>
<jodd.version>3.5.2</jodd.version>
<json.version>20090211</json.version>
@@ -336,6 +337,11 @@
<version>${jline.version}</version>
</dependency>
<dependency>
+ <groupId>org.fusesource.jansi</groupId>
+ <artifactId>jansi</artifactId>
+ <version>${jansi.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
Modified: hive/trunk/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/pom.xml?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/trunk/ql/pom.xml Mon Nov 3 02:21:01 2014
@@ -274,6 +274,16 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ <version>${jline.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.fusesource.jansi</groupId>
+ <artifactId>jansi</artifactId>
+ <version>${jansi.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
<version>${tez.version}</version>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java Mon Nov 3 02:21:01 2014
@@ -19,29 +19,48 @@
package org.apache.hadoop.hive.ql.exec.tez;
import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
+import static org.fusesource.jansi.internal.CLibrary.isatty;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Heartbeater;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.fusesource.jansi.Ansi;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import jline.Terminal;
/**
* TezJobMonitor keeps track of a tez job while it's being executed. It will
@@ -50,16 +69,47 @@ import org.apache.tez.dag.api.client.Sta
*/
public class TezJobMonitor {
- private static final Log LOG = LogFactory.getLog(TezJobMonitor.class.getName());
private static final String CLASS_NAME = TezJobMonitor.class.getName();
+ private static final int MIN_TERMINAL_WIDTH = 80;
+ private static final int COLUMN_1_WIDTH = 16;
+ private static final int SEPARATOR_WIDTH = 80;
+
+ // keep this within 80 chars width. If more columns needs to be added then update min terminal
+ // width requirement and separator width accordingly
+ private static final String HEADER_FORMAT = "%16s%12s %5s %9s %7s %7s %6s %6s";
+ private static final String VERTEX_FORMAT = "%-16s%12s %5s %9s %7s %7s %6s %6s";
+ private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
+ private static final String HEADER = String.format(HEADER_FORMAT,
+ "VERTICES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");
+
+ // method and dag summary format
+ private static final String SUMMARY_HEADER_FORMAT = "%-16s %-12s %-12s %-15s %-20s %-15s %-15s %-15s %-15s";
+ private static final String SUMMARY_VERTEX_FORMAT = "%-12s %11s %13s %12s %19s %19s %13s %15s %16s";
+ private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT,
+ "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION_SECONDS",
+ "CPU_TIME_MILLIS", "GC_TIME_MILLIS", "INPUT_RECORDS", "OUTPUT_RECORDS");
+
+ private static final String TOTAL_PREP_TIME = "TotalPrepTime";
+ private static final String METHOD = "METHOD";
+ private static final String DURATION = "DURATION(ms)";
+
+ // in-place progress update related variables
+ private int lines;
+ private PrintStream out;
+ private String separator;
private transient LogHelper console;
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private final int checkInterval = 200;
private final int maxRetryInterval = 2500;
private final int printInterval = 3000;
+ private final int progressBarChars = 30;
private long lastPrintTime;
private Set<String> completed;
+
+ /* Pretty print the values */
+ private final NumberFormat secondsFormat;
+ private final NumberFormat commaFormat;
private static final List<DAGClient> shutdownList;
static {
@@ -83,20 +133,111 @@ public class TezJobMonitor {
}
public TezJobMonitor() {
- console = new LogHelper(LOG);
+ console = SessionState.getConsole();
+ secondsFormat = new DecimalFormat("#0.00");
+ commaFormat = NumberFormat.getNumberInstance(Locale.US);
+ // all progress updates are written to info stream and log file. In-place updates can only be
+ // done to info stream (console)
+ out = console.getInfoStream();
+ separator = "";
+ for (int i = 0; i < SEPARATOR_WIDTH; i++) {
+ separator += "-";
+ }
+ }
+
+ private static boolean isUnixTerminal() {
+
+ String os = System.getProperty("os.name");
+ if (os.startsWith("Windows")) {
+ // we do not support Windows, we will revisit this if we really need it for windows.
+ return false;
+ }
+
+ // We must be on some unix variant..
+ // check if standard out is a terminal
+ try {
+ // isatty system call will return 1 if the file descriptor is terminal else 0
+ if (isatty(STDOUT_FILENO) == 0) {
+ return false;
+ }
+ } catch (NoClassDefFoundError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ } catch (UnsatisfiedLinkError ignore) {
+ // These errors happen if the JNI lib is not available for your platform.
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line.
+ * @param line - line to print
+ */
+ public void reprintLine(String line) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given line with the specified color.
+ * @param line - line to print
+ * @param color - color for the line
+ */
+ public void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+ .toString());
+ out.flush();
+ lines++;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Erases the current line and prints the given multiline. Make sure the specified line is not
+ * terminated by linebreak.
+ * @param line - line to print
+ */
+ public void reprintMultiLine(String line) {
+ int numLines = line.split("\r\n|\r|\n").length;
+ out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+ out.flush();
+ lines += numLines;
+ }
+
+ /**
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Repositions the cursor back to line 0.
+ */
+ public void repositionCursor() {
+ if (lines > 0) {
+ out.print(ansi().cursorUp(lines).toString());
+ out.flush();
+ lines = 0;
+ }
}
/**
- * monitorExecution handles status printing, failures during execution and final
- * status retrieval.
+ * NOTE: Use this method only if isUnixTerminal is true.
+ * Gets the width of the terminal
+ * @return - width of terminal
+ */
+ public int getTerminalWidth() {
+ return Terminal.getTerminal().getTerminalWidth();
+ }
+
+ /**
+ * monitorExecution handles status printing, failures during execution and final status retrieval.
*
* @param dagClient client that was used to kick off the job
* @param txnMgr transaction manager for this operation
* @param conf configuration file for this operation
* @return int 0 - success, 1 - killed, 2 - failed
*/
- public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr,
- HiveConf conf) throws InterruptedException {
+ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf,
+ DAG dag) throws InterruptedException {
DAGStatus status = null;
completed = new HashSet<String>();
@@ -109,6 +250,22 @@ public class TezJobMonitor {
Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
long startTime = 0;
+ boolean isProfileEnabled = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY);
+ boolean inPlaceUpdates = conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+ boolean wideTerminal = false;
+ boolean isTerminal = inPlaceUpdates == true ? isUnixTerminal() : false;
+
+ // we need at least 80 chars wide terminal to display in-place updates properly
+ if (isTerminal) {
+ if (getTerminalWidth() >= MIN_TERMINAL_WIDTH) {
+ wideTerminal = true;
+ }
+ }
+
+ boolean inPlaceEligible = false;
+ if (inPlaceUpdates && isTerminal && wideTerminal && !console.getIsSilent()) {
+ inPlaceEligible = true;
+ }
shutdownList.add(dagClient);
@@ -116,7 +273,7 @@ public class TezJobMonitor {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
- while(true) {
+ while (true) {
try {
status = dagClient.getDAGStatus(opts);
@@ -127,7 +284,7 @@ public class TezJobMonitor {
if (state != lastState || state == RUNNING) {
lastState = state;
- switch(state) {
+ switch (state) {
case SUBMITTED:
console.printInfo("Status: Submitted");
break;
@@ -138,23 +295,49 @@ public class TezJobMonitor {
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
- for (String s: progressMap.keySet()) {
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
- }
startTime = System.currentTimeMillis();
running = true;
}
- lastReport = printStatus(progressMap, lastReport, console);
+ if (inPlaceEligible) {
+ printStatusInPlace(progressMap, startTime, false, dagClient);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ } else {
+ lastReport = printStatus(progressMap, lastReport, console);
+ }
break;
case SUCCEEDED:
- lastReport = printStatus(progressMap, lastReport, console);
- double duration = (System.currentTimeMillis() - startTime)/1000.0;
- console.printInfo("Status: Finished successfully in " + String.format("%.2f seconds", duration));
+ if (inPlaceEligible) {
+ printStatusInPlace(progressMap, startTime, false, dagClient);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ } else {
+ lastReport = printStatus(progressMap, lastReport, console);
+ }
+
+ /* Profile info is collected anyways, isProfileEnabled
+ * decides if it gets printed or not
+ */
+ if (isProfileEnabled) {
+
+ double duration = (System.currentTimeMillis() - startTime) / 1000.0;
+ console.printInfo("Status: DAG finished successfully in "
+ + String.format("%.2f seconds", duration));
+ console.printInfo("\n");
+
+ printMethodsSummary();
+ printDagSummary(progressMap, console, dagClient, conf, dag);
+ }
running = false;
done = true;
break;
case KILLED:
+ if (inPlaceEligible) {
+ printStatusInPlace(progressMap, startTime, true, dagClient);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ }
console.printInfo("Status: Killed");
running = false;
done = true;
@@ -162,6 +345,11 @@ public class TezJobMonitor {
break;
case FAILED:
case ERROR:
+ if (inPlaceEligible) {
+ printStatusInPlace(progressMap, startTime, true, dagClient);
+ // log the progress report to log file as well
+ console.logInfo(getReport(progressMap));
+ }
console.printError("Status: Failed");
running = false;
done = true;
@@ -173,15 +361,15 @@ public class TezJobMonitor {
Thread.sleep(checkInterval);
}
} catch (Exception e) {
- console.printInfo("Exception: "+e.getMessage());
- if (++failedCounter % maxRetryInterval/checkInterval == 0
+ console.printInfo("Exception: " + e.getMessage());
+ if (++failedCounter % maxRetryInterval / checkInterval == 0
|| e instanceof InterruptedException) {
try {
console.printInfo("Killing DAG...");
dagClient.tryKillDAG();
- } catch(IOException io) {
+ } catch (IOException io) {
// best effort
- } catch(TezException te) {
+ } catch (TezException te) {
// best effort
}
e.printStackTrace();
@@ -194,7 +382,7 @@ public class TezJobMonitor {
} finally {
if (done) {
if (rc != 0 && status != null) {
- for (String diag: status.getDiagnostics()) {
+ for (String diag : status.getDiagnostics()) {
console.printError(diag);
}
}
@@ -222,7 +410,323 @@ public class TezJobMonitor {
}
}
+ private static long getCounterValueByGroupName(TezCounters vertexCounters,
+ String groupNamePattern,
+ String counterName) {
+ TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
+ return (tezCounter == null) ? 0 : tezCounter.getValue();
+ }
+
+ private void printMethodsSummary() {
+ long totalInPrepTime = 0;
+
+ String[] perfLoggerReportMethods = {
+ (PerfLogger.PARSE),
+ (PerfLogger.ANALYZE),
+ (PerfLogger.TEZ_BUILD_DAG),
+ (PerfLogger.TEZ_SUBMIT_TO_RUNNING)
+ };
+
+ /* Build the method summary header */
+ String methodBreakdownHeader = String.format("%-30s %-13s", METHOD, DURATION);
+ console.printInfo(methodBreakdownHeader);
+
+ for (String method : perfLoggerReportMethods) {
+ long duration = perfLogger.getDuration(method);
+ totalInPrepTime += duration;
+ console.printInfo(String.format("%-30s %11s", method, commaFormat.format(duration)));
+ }
+
+ /*
+ * The counters list above don't capture the total time from TimeToSubmit.startTime till
+ * TezRunDag.startTime, so calculate the duration and print it.
+ */
+ totalInPrepTime = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) -
+ perfLogger.getStartTime(PerfLogger.TIME_TO_SUBMIT);
+
+ console.printInfo(String.format("%-30s %11s\n", TOTAL_PREP_TIME, commaFormat.format(
+ totalInPrepTime)));
+ }
+
+ private void printDagSummary(Map<String, Progress> progressMap, LogHelper console,
+ DAGClient dagClient, HiveConf conf, DAG dag) {
+
+ /* Strings for headers and counters */
+ String hiveCountersGroup = conf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+ Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ TezCounters hiveCounters = null;
+ try {
+ hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
+ } catch (IOException e) {
+ // best attempt, shouldn't really kill DAG for this
+ } catch (TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+
+ /* If the counters are missing there is no point trying to print progress */
+ if (hiveCounters == null) {
+ return;
+ }
+
+ /* Print the per Vertex summary */
+ console.printInfo(SUMMARY_HEADER);
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ for (String vertexName : keys) {
+ Progress progress = progressMap.get(vertexName);
+ if (progress != null) {
+ final int totalTasks = progress.getTotalTaskCount();
+ final int failedTaskAttempts = progress.getFailedTaskAttemptCount();
+ final int killedTasks = progress.getKilledTaskCount();
+ final double duration =
+ perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName) / 1000.0;
+ VertexStatus vertexStatus = null;
+ try {
+ vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
+ } catch (IOException e) {
+ // best attempt, shouldn't really kill DAG for this
+ } catch (TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+
+ if (vertexStatus == null) {
+ continue;
+ }
+
+ Vertex currentVertex = dag.getVertex(vertexName);
+ List<Vertex> inputVerticesList = currentVertex.getInputVertices();
+ long hiveInputRecordsFromOtherVertices = 0;
+ if (inputVerticesList.size() > 0) {
+
+ for (Vertex inputVertex : inputVerticesList) {
+ String inputVertexName = inputVertex.getName();
+ hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
+ hiveCountersGroup, String.format("%s_",
+ ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) +
+ inputVertexName.replace(" ", "_"));
+
+ hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
+ hiveCountersGroup, String.format("%s_",
+ FileSinkOperator.Counter.RECORDS_OUT.toString()) +
+ inputVertexName.replace(" ", "_"));
+ }
+ }
+
+ /*
+ * Get the CPU & GC
+ *
+ * counters org.apache.tez.common.counters.TaskCounter
+ * GC_TIME_MILLIS=37712
+ * CPU_MILLISECONDS=2774230
+ */
+ final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+ final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.CPU_MILLISECONDS.name());
+
+ final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
+ TaskCounter.class.getName(),
+ TaskCounter.GC_TIME_MILLIS.name());
+
+ /*
+ * Get the HIVE counters
+ *
+ * HIVE
+ * CREATED_FILES=1
+ * DESERIALIZE_ERRORS=0
+ * RECORDS_IN_Map_1=550076554
+ * RECORDS_OUT_INTERMEDIATE_Map_1=854987
+ * RECORDS_OUT_Reducer_2=1
+ */
+ final long hiveInputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup,
+ MapOperator.Counter.RECORDS_IN.toString()) + hiveInputRecordsFromOtherVertices;
+ final long hiveOutputIntermediateRecords = getCounterValueByGroupName(hiveCounters,
+ hiveCountersGroup, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString());
+ final long hiveOutputRecords = getCounterValueByGroupName(hiveCounters, hiveCountersGroup,
+ FileSinkOperator.Counter.RECORDS_OUT.toString()) + hiveOutputIntermediateRecords;
+
+ String vertexExecutionStats = String.format(SUMMARY_VERTEX_FORMAT,
+ vertexName,
+ totalTasks,
+ failedTaskAttempts,
+ killedTasks,
+ secondsFormat.format((duration)),
+ commaFormat.format(cpuTimeMillis),
+ commaFormat.format(gcTimeMillis),
+ commaFormat.format(hiveInputRecords),
+ commaFormat.format(hiveOutputRecords));
+ console.printInfo(vertexExecutionStats);
+ }
+ }
+ }
+
+ private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
+ boolean vextexStatusFromAM, DAGClient dagClient) {
+ StringBuffer reportBuffer = new StringBuffer();
+ int sumComplete = 0;
+ int sumTotal = 0;
+
+ // position the cursor to line 0
+ repositionCursor();
+
+ // print header
+ // -------------------------------------------------------------------------------
+ // VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
+ // -------------------------------------------------------------------------------
+ reprintLine(separator);
+ reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
+ reprintLine(separator);
+
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ int idx = 0;
+ int maxKeys = keys.size();
+ for (String s : keys) {
+ idx++;
+ Progress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskAttemptCount();
+ final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() -
+ progress.getRunningTaskCount();
+ final int killed = progress.getKilledTaskCount();
+
+ // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
+ // get status from AM for every refresh of the UI. Lets infer the state from task counts.
+ // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
+ VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
+
+ // INITED state
+ if (total > 0) {
+ vertexState = VertexStatus.State.INITED;
+ sumComplete += complete;
+ sumTotal += total;
+ }
+
+ // RUNNING state
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+ vertexState = VertexStatus.State.RUNNING;
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ }
+
+ // SUCCEEDED state
+ if (complete == total) {
+ vertexState = VertexStatus.State.SUCCEEDED;
+ if (!completed.contains(s)) {
+ completed.add(s);
+
+ /* We may have missed the start of the vertex
+ * due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ }
+
+ // DAG might have been killed, lets try to get vertex state from AM before dying
+ // KILLED or FAILED state
+ if (vextexStatusFromAM) {
+ VertexStatus vertexStatus = null;
+ try {
+ vertexStatus = dagClient.getVertexStatus(s, null);
+ } catch (IOException e) {
+ // best attempt, shouldn't really kill DAG for this
+ } catch (TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ if (vertexStatus != null) {
+ vertexState = vertexStatus.getState();
+ }
+ }
+
+ // Map 1 .......... SUCCEEDED 7 7 0 0 0 0
+ String nameWithProgress = getNameWithProgress(s, complete, total);
+ String vertexStr = String.format(VERTEX_FORMAT,
+ nameWithProgress,
+ vertexState.toString(),
+ total,
+ complete,
+ running,
+ pending,
+ failed,
+ killed);
+ reportBuffer.append(vertexStr);
+ if (idx != maxKeys) {
+ reportBuffer.append("\n");
+ }
+ }
+
+ reprintMultiLine(reportBuffer.toString());
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ reprintLine(separator);
+ final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
+ String footer = getFooter(keys.size(), completed.size(), progress, startTime);
+ reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+ reprintLine(separator);
+ }
+
+ // Map 1 ..........
+ private String getNameWithProgress(String s, int complete, int total) {
+ float percent = total == 0 ? 0.0f : (float) complete / (float) total;
+ // lets use the remaining space in column 1 as progress bar
+ int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+ String result = s + " ";
+ int toFill = (int) (spaceRemaining * percent);
+ for (int i = 0; i < toFill; i++) {
+ result += ".";
+ }
+ return result;
+ }
+
+ // VERTICES: 03/04 [==================>>-----] 86% ELAPSED TIME: 1.71 s
+ private String getFooter(int keySize, int completedSize, float progress, long startTime) {
+ String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize);
+ String progressBar = getInPlaceProgressBar(progress);
+ final int progressPercent = (int) (progress * 100);
+ String progressStr = "" + progressPercent + "%";
+ float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
+ String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
+ String footer = String.format(FOOTER_FORMAT,
+ verticesSummary, progressBar, progressStr, elapsedTime);
+ return footer;
+ }
+
+ // [==================>>-----]
+ private String getInPlaceProgressBar(float percent) {
+ StringBuilder bar = new StringBuilder("[");
+ int remainingChars = progressBarChars - 4;
+ int completed = (int) (remainingChars * percent);
+ int pending = remainingChars - completed;
+ for (int i = 0; i < completed; i++) {
+ bar.append("=");
+ }
+ bar.append(">>");
+ for (int i = 0; i < pending; i++) {
+ bar.append("-");
+ }
+ bar.append("]");
+ return bar.toString();
+ }
+
private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
+ String report = getReport(progressMap);
+ if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
+ console.printInfo(report);
+ lastPrintTime = System.currentTimeMillis();
+ }
+ return report;
+ }
+
+ private String getReport(Map<String, Progress> progressMap) {
StringBuffer reportBuffer = new StringBuffer();
SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
@@ -231,7 +735,7 @@ public class TezJobMonitor {
final int complete = progress.getSucceededTaskCount();
final int total = progress.getTotalTaskCount();
final int running = progress.getRunningTaskCount();
- final int failed = progress.getFailedTaskCount();
+ final int failed = progress.getFailedTaskAttemptCount();
if (total <= 0) {
reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
} else {
@@ -258,12 +762,6 @@ public class TezJobMonitor {
}
}
- String report = reportBuffer.toString();
- if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
- console.printInfo(report);
- lastPrintTime = System.currentTimeMillis();
- }
-
- return report;
+ return reportBuffer.toString();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Nov 3 02:21:01 2014
@@ -163,7 +163,7 @@ public class TezTask extends Task<TezWor
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor();
- rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf);
+ rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag);
// fetch the counters
Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Mon Nov 3 02:21:01 2014
@@ -18,14 +18,14 @@
package org.apache.hadoop.hive.ql.log;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.session.SessionState;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* PerfLogger.
*
@@ -147,10 +147,37 @@ public class PerfLogger {
}
public Long getStartTime(String method) {
- return startTimes.get(method);
+ long startTime = 0L;
+
+ if (startTimes.containsKey(method)) {
+ startTime = startTimes.get(method);
+ }
+ return startTime;
}
public Long getEndTime(String method) {
- return endTimes.get(method);
+ long endTime = 0L;
+
+ if (endTimes.containsKey(method)) {
+ endTime = endTimes.get(method);
+ }
+ return endTime;
}
+
+ public boolean startTimeHasMethod(String method) {
+ return startTimes.containsKey(method);
+ }
+
+ public boolean endTimeHasMethod(String method) {
+ return endTimes.containsKey(method);
+ }
+
+ public Long getDuration(String method) {
+ long duration = 0;
+ if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
+ duration = endTimes.get(method) - startTimes.get(method);
+ }
+ return duration;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1636230&r1=1636229&r2=1636230&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Nov 3 02:21:01 2014
@@ -805,6 +805,14 @@ public class SessionState {
return (ss != null) ? ss.getIsSilent() : isSilent;
}
+ public void logInfo(String info) {
+ logInfo(info, null);
+ }
+
+ public void logInfo(String info, String detail) {
+ LOG.info(info + StringUtils.defaultString(detail));
+ }
+
public void printInfo(String info) {
printInfo(info, null);
}