You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/02/07 20:12:37 UTC

[4/4] hive git commit: HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)

HIVE-15473: Progress Bar on Beeline client  (Anishek Agarwal via Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3e01ef32
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3e01ef32
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3e01ef32

Branch: refs/heads/master
Commit: 3e01ef3268ffbcb69c5c18c2c9f8810512c91bf8
Parents: f6cdbc8
Author: Anishek Agarwal <an...@gmail.com>
Authored: Fri Jan 6 14:31:21 2017 +0530
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Feb 7 12:12:27 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/hive/beeline/Commands.java  |   84 +-
 .../logs/BeelineInPlaceUpdateStream.java        |   66 ++
 common/pom.xml                                  |    5 +
 .../hadoop/hive/common/log/InPlaceUpdate.java   |  202 ++++
 .../hadoop/hive/common/log/ProgressMonitor.java |   51 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    7 +-
 .../TestOperationLoggingAPIWithMr.java          |    2 +-
 .../TestOperationLoggingAPIWithTez.java         |    2 +-
 .../org/apache/hive/jdbc/HiveStatement.java     |   13 +
 .../hive/jdbc/logs/InPlaceUpdateStream.java     |   14 +
 ql/pom.xml                                      |    5 -
 .../hadoop/hive/ql/exec/InPlaceUpdates.java     |   89 --
 .../hive/ql/exec/SerializationUtilities.java    |    1 -
 .../ql/exec/spark/status/SparkJobMonitor.java   |    6 +-
 .../hive/ql/exec/tez/TezJobExecHelper.java      |    5 +-
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  | 1016 -----------------
 .../hive/ql/exec/tez/TezSessionState.java       |    8 +-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |    6 +-
 .../hive/ql/exec/tez/monitoring/Constants.java  |    7 +
 .../hive/ql/exec/tez/monitoring/DAGSummary.java |  197 ++++
 .../exec/tez/monitoring/FSCountersSummary.java  |   92 ++
 .../ql/exec/tez/monitoring/LLAPioSummary.java   |  108 ++
 .../ql/exec/tez/monitoring/PrintSummary.java    |    7 +
 .../QueryExecutionBreakdownSummary.java         |   75 ++
 .../ql/exec/tez/monitoring/TezJobMonitor.java   |  397 +++++++
 .../exec/tez/monitoring/TezProgressMonitor.java |  313 ++++++
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   11 +-
 .../hadoop/hive/ql/session/SessionState.java    |   12 +
 .../tez/monitoring/TestTezProgressMonitor.java  |  101 ++
 service-rpc/if/TCLIService.thrift               |   26 +-
 .../gen/thrift/gen-cpp/TCLIService_types.cpp    |  322 ++++++
 .../src/gen/thrift/gen-cpp/TCLIService_types.h  |  102 +-
 .../rpc/thrift/TGetOperationStatusReq.java      |  109 +-
 .../rpc/thrift/TGetOperationStatusResp.java     |  116 +-
 .../service/rpc/thrift/TJobExecutionStatus.java |   48 +
 .../service/rpc/thrift/TProgressUpdateResp.java | 1033 ++++++++++++++++++
 service-rpc/src/gen/thrift/gen-php/Types.php    |  327 ++++++
 .../src/gen/thrift/gen-py/TCLIService/ttypes.py |  214 +++-
 .../gen/thrift/gen-rb/t_c_l_i_service_types.rb  |   51 +-
 .../org/apache/hive/service/cli/CLIService.java |   63 +-
 .../service/cli/EmbeddedCLIServiceClient.java   |    4 +-
 .../apache/hive/service/cli/ICLIService.java    |    2 +-
 .../hive/service/cli/JobProgressUpdate.java     |   38 +
 .../hive/service/cli/OperationStatus.java       |    8 +
 .../cli/ProgressMonitorStatusMapper.java        |   19 +
 .../cli/TezProgressMonitorStatusMapper.java     |   32 +
 .../thrift/RetryingThriftCLIServiceClient.java  |    5 +-
 .../service/cli/thrift/ThriftCLIService.java    |   28 +-
 .../cli/thrift/ThriftCLIServiceClient.java      |    3 +-
 .../apache/hive/service/cli/CLIServiceTest.java |   18 +-
 .../cli/TestRetryingThriftCLIServiceClient.java |    2 +-
 .../cli/thrift/ThriftCLIServiceTest.java        |    8 +-
 .../thrift/ThriftCliServiceTestWithCookie.java  |    2 +-
 53 files changed, 4268 insertions(+), 1214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/Commands.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java
index 748546d..99db643 100644
--- a/beeline/src/java/org/apache/hive/beeline/Commands.java
+++ b/beeline/src/java/org/apache/hive/beeline/Commands.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.SystemVariables;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hive.beeline.logs.BeelineInPlaceUpdateStream;
 import org.apache.hive.jdbc.HiveStatement;
 import org.apache.hive.jdbc.Utils;
 import org.apache.hive.jdbc.Utils.JdbcConnectionParams;
@@ -982,6 +983,11 @@ public class Commands {
             logThread = new Thread(createLogRunnable(stmnt));
             logThread.setDaemon(true);
             logThread.start();
+            if (stmnt instanceof HiveStatement) {
+              ((HiveStatement) stmnt).setInPlaceUpdateStream(
+                  new BeelineInPlaceUpdateStream(beeLine.getOutputStream())
+              );
+            }
             hasResults = stmnt.execute(sql);
             logThread.interrupt();
             logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
@@ -1242,43 +1248,65 @@ public class Commands {
     command.setLength(0);
   }
 
-  private Runnable createLogRunnable(Statement statement) {
+  private Runnable createLogRunnable(final Statement statement) {
     if (statement instanceof HiveStatement) {
-      final HiveStatement hiveStatement = (HiveStatement) statement;
-
-      Runnable runnable = new Runnable() {
-        @Override
-        public void run() {
-          while (hiveStatement.hasMoreLogs()) {
-            try {
-              // fetch the log periodically and output to beeline console
-              for (String log : hiveStatement.getQueryLog()) {
-                beeLine.info(log);
-              }
-              Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
-            } catch (SQLException e) {
-              beeLine.error(new SQLWarning(e));
-              return;
-            } catch (InterruptedException e) {
-              beeLine.debug("Getting log thread is interrupted, since query is done!");
-              showRemainingLogsIfAny(hiveStatement);
-              return;
-            }
-          }
-        }
-      };
-      return runnable;
+      return new LogRunnable(this, (HiveStatement) statement,
+          DEFAULT_QUERY_PROGRESS_INTERVAL);
     } else {
-      beeLine.debug("The statement instance is not HiveStatement type: " + statement.getClass());
+      beeLine.debug(
+          "The statement instance is not HiveStatement type: " + statement
+              .getClass());
       return new Runnable() {
-        @Override
-        public void run() {
+        @Override public void run() {
           // do nothing.
         }
       };
     }
   }
 
+  private void error(Throwable throwable) {
+    beeLine.error(throwable);
+  }
+
+  private void debug(String message) {
+    beeLine.debug(message);
+  }
+
+
+
+  static class LogRunnable implements Runnable {
+    private final Commands commands;
+    private final HiveStatement hiveStatement;
+    private final long queryProgressInterval;
+
+    LogRunnable(Commands commands, HiveStatement hiveStatement,
+        long queryProgressInterval) {
+      this.hiveStatement = hiveStatement;
+      this.commands = commands;
+      this.queryProgressInterval = queryProgressInterval;
+    }
+
+    private void updateQueryLog() throws SQLException {
+      for (String log : hiveStatement.getQueryLog()) {
+        commands.beeLine.info(log);
+      }
+    }
+
+    @Override public void run() {
+      while (hiveStatement.hasMoreLogs()) {
+        try {
+          updateQueryLog();
+          Thread.sleep(queryProgressInterval);
+        } catch (SQLException e) {
+          commands.error(new SQLWarning(e));
+        } catch (InterruptedException e) {
+          commands.debug("Getting log thread is interrupted, since query is done!");
+          commands.showRemainingLogsIfAny(hiveStatement);
+        }
+      }
+    }
+  }
+
   private void showRemainingLogsIfAny(Statement statement) {
     if (statement instanceof HiveStatement) {
       HiveStatement hiveStatement = (HiveStatement) statement;

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000..2ed289c
--- /dev/null
+++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java
@@ -0,0 +1,66 @@
+package org.apache.hive.beeline.logs;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
+import org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+import java.io.PrintStream;
+import java.util.List;
+
+public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+  private InPlaceUpdate inPlaceUpdate;
+
+  public BeelineInPlaceUpdateStream(PrintStream out) {
+    this.inPlaceUpdate = new InPlaceUpdate(out);
+  }
+
+  @Override
+  public void update(TProgressUpdateResp response) {
+    if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE))
+      return;
+
+    inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+  }
+
+  static class ProgressMonitorWrapper implements ProgressMonitor {
+    private TProgressUpdateResp response;
+
+    ProgressMonitorWrapper(TProgressUpdateResp response) {
+      this.response = response;
+    }
+
+    @Override
+    public List<String> headers() {
+      return response.getHeaderNames();
+    }
+
+    @Override
+    public List<List<String>> rows() {
+      return response.getRows();
+    }
+
+    @Override
+    public String footerSummary() {
+      return response.getFooterSummary();
+    }
+
+    @Override
+    public long startTime() {
+      return response.getStartTime();
+    }
+
+    @Override
+    public String executionStatus() {
+      throw new UnsupportedOperationException(
+          "This should never be used for anything. All the required data is available via other methods"
+      );
+    }
+
+    @Override
+    public double progressedPercentage() {
+      return response.getProgressedPercentage();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index fd948f8..8474a87 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -64,6 +64,11 @@
       <artifactId>orc-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>jline</groupId>
+      <artifactId>jline</artifactId>
+      <version>${jline.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.eclipse.jetty.aggregate</groupId>
       <artifactId>jetty-all</artifactId>
       <version>${jetty.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
new file mode 100644
index 0000000..bfdb4fa
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -0,0 +1,202 @@
+package org.apache.hadoop.hive.common.log;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import jline.TerminalFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.fusesource.jansi.Ansi;
+
+import javax.annotation.Nullable;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.text.DecimalFormat;
+import java.util.List;
+
+import static org.fusesource.jansi.Ansi.ansi;
+import static org.fusesource.jansi.internal.CLibrary.*;
+
+/**
+ * Renders information from ProgressMonitor to the stream provided.
+ */
+public class InPlaceUpdate {
+
+  public static final int MIN_TERMINAL_WIDTH = 94;
+
+  // keep this within 80 chars width. If more columns needs to be added then update min terminal
+  // width requirement and SEPARATOR width accordingly
+  private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
+  private static final String VERTEX_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
+  private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
+
+  private static final int PROGRESS_BAR_CHARS = 30;
+  private static final String SEPARATOR = new String(new char[MIN_TERMINAL_WIDTH]).replace("\0", "-");
+
+  /* Pretty print the values */
+  private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+  private int lines = 0;
+  private PrintStream out;
+
+  public InPlaceUpdate(PrintStream out) {
+    this.out = out;
+  }
+
+  public InPlaceUpdate() {
+    this(System.out);
+  }
+
+  public static void reprintLine(PrintStream out, String line) {
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+    out.flush();
+  }
+
+  public static void rePositionCursor(PrintStream ps) {
+    ps.print(ansi().cursorUp(0).toString());
+    ps.flush();
+  }
+
+  /**
+   * NOTE: Use this method only if isUnixTerminal is true.
+   * Erases the current line and prints the given line.
+   *
+   * @param line - line to print
+   */
+  private void reprintLine(String line) {
+    reprintLine(out, line);
+    lines++;
+  }
+
+  /**
+   * NOTE: Use this method only if isUnixTerminal is true.
+   * Erases the current line and prints the given line with the specified color.
+   *
+   * @param line  - line to print
+   * @param color - color for the line
+   */
+  private void reprintLineWithColorAsBold(String line, Ansi.Color color) {
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
+      .toString());
+    out.flush();
+    lines++;
+  }
+
+  /**
+   * NOTE: Use this method only if isUnixTerminal is true.
+   * Erases the current line and prints the given multiline. Make sure the specified line is not
+   * terminated by linebreak.
+   *
+   * @param line - line to print
+   */
+  private void reprintMultiLine(String line) {
+    int numLines = line.split("\r\n|\r|\n").length;
+    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
+    out.flush();
+    lines += numLines;
+  }
+
+  /**
+   * NOTE: Use this method only if isUnixTerminal is true.
+   * Repositions the cursor back to line 0.
+   */
+  private void repositionCursor() {
+    if (lines > 0) {
+      out.print(ansi().cursorUp(lines).toString());
+      out.flush();
+      lines = 0;
+    }
+  }
+
+
+  // [==================>>-----]
+  private String getInPlaceProgressBar(double percent) {
+    StringWriter bar = new StringWriter();
+    bar.append("[");
+    int remainingChars = PROGRESS_BAR_CHARS - 4;
+    int completed = (int) (remainingChars * percent);
+    int pending = remainingChars - completed;
+    for (int i = 0; i < completed; i++) {
+      bar.append("=");
+    }
+    bar.append(">>");
+    for (int i = 0; i < pending; i++) {
+      bar.append("-");
+    }
+    bar.append("]");
+    return bar.toString();
+  }
+
+  public void render(ProgressMonitor monitor) {
+    if (monitor == null) return;
+    // position the cursor to line 0
+    repositionCursor();
+
+    // print header
+    // -------------------------------------------------------------------------------
+    //         VERTICES     STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
+    // -------------------------------------------------------------------------------
+    reprintLine(SEPARATOR);
+    reprintLineWithColorAsBold(String.format(HEADER_FORMAT, monitor.headers().toArray()),
+      Ansi.Color.CYAN);
+    reprintLine(SEPARATOR);
+
+
+    // Map 1 .......... container  SUCCEEDED      7          7        0        0       0       0
+    List<String> printReady = Lists.transform(monitor.rows(), new Function<List<String>, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable List<String> row) {
+        return String.format(VERTEX_FORMAT, row.toArray());
+      }
+    });
+    reprintMultiLine(StringUtils.join(printReady, "\n"));
+
+    // -------------------------------------------------------------------------------
+    // VERTICES: 03/04            [=================>>-----] 86%  ELAPSED TIME: 1.71 s
+    // -------------------------------------------------------------------------------
+    String progressStr = "" + (int) (monitor.progressedPercentage() * 100) + "%";
+    float et = (float) (System.currentTimeMillis() - monitor.startTime()) / (float) 1000;
+    String elapsedTime = "ELAPSED TIME: " + secondsFormatter.format(et) + " s";
+    String footer = String.format(
+      FOOTER_FORMAT,
+      monitor.footerSummary(),
+      getInPlaceProgressBar(monitor.progressedPercentage()),
+      progressStr,
+      elapsedTime);
+
+    reprintLineWithColorAsBold(footer, Ansi.Color.RED);
+    reprintLine(SEPARATOR);
+  }
+
+
+  public static boolean canRenderInPlace(HiveConf conf) {
+    boolean inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
+
+    // we need at least 80 chars wide terminal to display in-place updates properly
+    return inPlaceUpdates && isUnixTerminal() && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
+  }
+
+  private static boolean isUnixTerminal() {
+
+    String os = System.getProperty("os.name");
+    if (os.startsWith("Windows")) {
+      // we do not support Windows, we will revisit this if we really need it for windows.
+      return false;
+    }
+
+    // We must be on some unix variant..
+    // check if standard out is a terminal
+    try {
+      // isatty system call will return 1 if the file descriptor is terminal else 0
+      if (isatty(STDOUT_FILENO) == 0) {
+        return false;
+      }
+      if (isatty(STDERR_FILENO) == 0) {
+        return false;
+      }
+    } catch (NoClassDefFoundError | UnsatisfiedLinkError ignore) {
+      // These errors happen if the JNI lib is not available for your platform.
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
new file mode 100644
index 0000000..ee02ccb
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java
@@ -0,0 +1,51 @@
+package org.apache.hadoop.hive.common.log;
+
+import java.util.Collections;
+import java.util.List;
+
+public interface ProgressMonitor {
+
+  ProgressMonitor NULL = new ProgressMonitor() {
+    @Override
+    public List<String> headers() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<List<String>> rows() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String footerSummary() {
+      return "";
+    }
+
+    @Override
+    public long startTime() {
+      return 0;
+    }
+
+    @Override
+    public String executionStatus() {
+      return "";
+    }
+
+    @Override
+    public double progressedPercentage() {
+      return 0;
+    }
+  };
+
+  List<String> headers();
+
+  List<List<String>> rows();
+
+  String footerSummary();
+
+  long startTime();
+
+  String executionStatus();
+
+  double progressedPercentage();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index cb27cd6..f3b01b2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2841,7 +2841,12 @@ public class HiveConf extends Configuration {
     TEZ_EXEC_INPLACE_PROGRESS(
         "hive.tez.exec.inplace.progress",
         true,
-        "Updates tez job execution progress in-place in the terminal."),
+        "Updates tez job execution progress in-place in the terminal when hive-cli is used."),
+    HIVE_SERVER2_INPLACE_PROGRESS(
+        "hive.server2.in.place.progress",
+        true,
+        "Allows hive server 2 to send progress bar update information. This is currently available"
+            + " only if the execution engine is tez."),
     SPARK_EXEC_INPLACE_PROGRESS("hive.spark.exec.inplace.progress", true,
         "Updates spark job execution progress in-place in the terminal."),
     TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.tez.container.max.java.heap.fraction", 0.8f,

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
index b8462c6..830ffc2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithMr.java
@@ -97,7 +97,7 @@ public class TestOperationLoggingAPIWithMr extends OperationLoggingAPITestBase {
       if (System.currentTimeMillis() > pollTimeout) {
         break;
       }
-      opStatus = client.getOperationStatus(operationHandle);
+      opStatus = client.getOperationStatus(operationHandle, false);
       Assert.assertNotNull(opStatus);
       state = opStatus.getState();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
index 8b5b516..e98406d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPIWithTez.java
@@ -50,7 +50,7 @@ public class TestOperationLoggingAPIWithTez extends OperationLoggingAPITestBase
       "<PERFLOG method=compile from=org.apache.hadoop.hive.ql.Driver>",
       "<PERFLOG method=parse from=org.apache.hadoop.hive.ql.Driver>",
       "<PERFLOG method=Driver.run from=org.apache.hadoop.hive.ql.Driver>",
-      "from=org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor",
+      "from=org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor",
       "org.apache.tez.common.counters.DAGCounter",
       "NUM_SUCCEEDED_TASKS",
       "TOTAL_LAUNCHED_TASKS",

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
index a242501..56860c4 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
@@ -19,6 +19,7 @@
 package org.apache.hive.jdbc;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.hive.jdbc.logs.InPlaceUpdateStream;
 import org.apache.hive.service.cli.RowSet;
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.rpc.thrift.TCLIService;
@@ -114,6 +115,8 @@ public class HiveStatement implements java.sql.Statement {
 
   private int queryTimeout = 0;
 
+  private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP;
+
   public HiveStatement(HiveConnection connection, TCLIService.Iface client,
       TSessionHandle sessHandle) {
     this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE);
@@ -342,6 +345,7 @@ public class HiveStatement implements java.sql.Statement {
 
   TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
     TGetOperationStatusReq statusReq = new TGetOperationStatusReq(stmtHandle);
+    statusReq.setGetProgressUpdate(inPlaceUpdateStream != InPlaceUpdateStream.NO_OP);
     TGetOperationStatusResp statusResp = null;
 
     // Poll on the operation status, till the operation is complete
@@ -352,6 +356,7 @@ public class HiveStatement implements java.sql.Statement {
          * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires
          */
         statusResp = client.GetOperationStatus(statusReq);
+        inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
         Utils.verifySuccessWithInfo(statusResp.getStatus());
         if (statusResp.isSetOperationState()) {
           switch (statusResp.getOperationState()) {
@@ -951,4 +956,12 @@ public class HiveStatement implements java.sql.Statement {
     }
     return null;
   }
+
+  /**
+   * This is only used by the beeline client to set the stream on which in place progress updates
+   * are to be shown
+   */
+  public void setInPlaceUpdateStream(InPlaceUpdateStream stream) {
+    this.inPlaceUpdateStream = stream;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
new file mode 100644
index 0000000..3a682b2
--- /dev/null
+++ b/jdbc/src/java/org/apache/hive/jdbc/logs/InPlaceUpdateStream.java
@@ -0,0 +1,14 @@
+package org.apache.hive.jdbc.logs;
+
+import org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+public interface InPlaceUpdateStream {
+  void update(TProgressUpdateResp response);
+
+  InPlaceUpdateStream NO_OP = new InPlaceUpdateStream() {
+    @Override
+    public void update(TProgressUpdateResp response) {
+
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index 84e83ee..1e6ba9a 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -463,11 +463,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>jline</groupId>
-      <artifactId>jline</artifactId>
-      <version>${jline.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
       <version>${tez.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
deleted file mode 100644
index f59d8e2..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/InPlaceUpdates.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import static org.fusesource.jansi.Ansi.ansi;
-import static org.fusesource.jansi.internal.CLibrary.STDERR_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.STDOUT_FILENO;
-import static org.fusesource.jansi.internal.CLibrary.isatty;
-
-import java.io.PrintStream;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.fusesource.jansi.Ansi;
-
-import jline.TerminalFactory;
-
-public class InPlaceUpdates {
-
-  public static final int MIN_TERMINAL_WIDTH = 94;
-
-  static boolean isUnixTerminal() {
-
-    String os = System.getProperty("os.name");
-    if (os.startsWith("Windows")) {
-      // we do not support Windows, we will revisit this if we really need it for windows.
-      return false;
-    }
-
-    // We must be on some unix variant..
-    // check if standard out is a terminal
-    try {
-      // isatty system call will return 1 if the file descriptor is terminal else 0
-      if (isatty(STDOUT_FILENO) == 0) {
-        return false;
-      }
-      if (isatty(STDERR_FILENO) == 0) {
-        return false;
-      }
-    } catch (NoClassDefFoundError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    } catch (UnsatisfiedLinkError ignore) {
-      // These errors happen if the JNI lib is not available for your platform.
-      return false;
-    }
-    return true;
-  }
-
-  public static boolean inPlaceEligible(HiveConf conf) {
-    String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
-    boolean inPlaceUpdates = false;
-    if (engine.equals("tez")) {
-      inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
-    }
-    if (engine.equals("spark")) {
-      inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
-    }
-
-    // we need at least 80 chars wide terminal to display in-place updates properly
-    return inPlaceUpdates && !SessionState.getConsole().getIsSilent() && isUnixTerminal()
-        && TerminalFactory.get().getWidth() >= MIN_TERMINAL_WIDTH;
-  }
-
-  public static void reprintLine(PrintStream out, String line) {
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
-    out.flush();
-  }
-
-  public static void rePositionCursor(PrintStream ps) {
-    ps.print(ansi().cursorUp(0).toString());
-    ps.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 7be628e..247d589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -38,7 +38,6 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index d5b9b5d..cf0162d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hive.ql.exec.spark.status;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.fusesource.jansi.Ansi;
@@ -82,7 +82,7 @@ abstract class SparkJobMonitor {
   protected SparkJobMonitor(HiveConf hiveConf) {
     monitorTimeoutInterval = hiveConf.getTimeVar(
         HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS);
-    inPlaceUpdate = InPlaceUpdates.inPlaceEligible(hiveConf);
+    inPlaceUpdate = InPlaceUpdate.canRenderInPlace(hiveConf) && !SessionState.getConsole().getIsSilent();
     console = SessionState.getConsole();
     out = SessionState.LogHelper.getInfoStream();
   }
@@ -270,7 +270,7 @@ abstract class SparkJobMonitor {
   }
 
   private void reprintLine(String line) {
-    InPlaceUpdates.reprintLine(out, line);
+    InPlaceUpdate.reprintLine(out, line);
     lines++;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
index a3fc815..a544b93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobExecHelper.java
@@ -18,10 +18,11 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import java.lang.reflect.Method;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
+
 /**
  * TezJobExecHelper is a utility to safely call Tez functionality from
  * common code paths. It will check if tez is available/installed before
@@ -37,7 +38,7 @@ public class TezJobExecHelper {
 
       // we have tez installed
       ClassLoader classLoader = TezJobExecHelper.class.getClassLoader();
-      Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.TezJobMonitor")
+      Method method = classLoader.loadClass("org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor")
         .getMethod("killRunningJobs");
       method.invoke(null, null);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
deleted file mode 100644
index bd935d4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.tez;
-
-import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
-import static org.fusesource.jansi.Ansi.ansi;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
-import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.fusesource.jansi.Ansi;
-
-import com.google.common.base.Preconditions;
-
-/**
- * TezJobMonitor keeps track of a tez job while it's being executed. It will
- * print status to the console and retrieve final status of the job after
- * completion.
- */
-public class TezJobMonitor {
-
-  private static final String CLASS_NAME = TezJobMonitor.class.getName();
-
-  private static final int COLUMN_1_WIDTH = 16;
-  private static final int SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH;
-  private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdates.MIN_TERMINAL_WIDTH + 34;
-  private static final String SEPARATOR = new String(new char[SEPARATOR_WIDTH]).replace("\0", "-");
-  private static final String FILE_HEADER_SEPARATOR =
-      new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
-  private static final String QUERY_EXEC_SUMMARY_HEADER = "Query Execution Summary";
-  private static final String TASK_SUMMARY_HEADER = "Task Execution Summary";
-  private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
-  private static final String FS_COUNTERS_SUMMARY_HEADER = "FileSystem Counters Summary";
-
-  // keep this within 80 chars width. If more columns needs to be added then update min terminal
-  // width requirement and SEPARATOR width accordingly
-  private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
-  private static final String VERTEX_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  %6s  ";
-  private static final String FOOTER_FORMAT = "%-15s  %-30s %-4s  %-25s";
-  private static final String HEADER = String.format(HEADER_FORMAT,
-      "VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");
-
-  // method and dag summary format
-  private static final String SUMMARY_HEADER_FORMAT = "%10s %14s %13s %12s %14s %15s";
-  private static final String SUMMARY_HEADER = String.format(SUMMARY_HEADER_FORMAT,
-      "VERTICES", "DURATION(ms)", "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
-  // used when I/O redirection is used
-  private static final String FILE_HEADER_FORMAT = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
-  private static final String FILE_HEADER = String.format(FILE_HEADER_FORMAT,
-      "VERTICES", "TOTAL_TASKS", "FAILED_ATTEMPTS", "KILLED_TASKS", "DURATION(ms)",
-      "CPU_TIME(ms)", "GC_TIME(ms)", "INPUT_RECORDS", "OUTPUT_RECORDS");
-
-  // LLAP counters
-  private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
-  private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
-      "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
-      "ALLOCATION", "USED", "TOTAL_IO");
-
-  // FileSystem counters
-  private static final String FS_COUNTERS_HEADER_FORMAT = "%10s %15s %13s %18s %18s %13s";
-
-  // Methods summary
-  private static final String OPERATION_SUMMARY = "%-35s %9s";
-  private static final String OPERATION = "OPERATION";
-  private static final String DURATION = "DURATION";
-
-  // in-place progress update related variables
-  private int lines;
-  private final PrintStream out;
-
-  private transient LogHelper console;
-  private final PerfLogger perfLogger = SessionState.getPerfLogger();
-  private final int checkInterval = 200;
-  private final int maxRetryInterval = 2500;
-  private final int printInterval = 3000;
-  private final int progressBarChars = 30;
-  private long lastPrintTime;
-  private Set<String> completed;
-
-  /* Pretty print the values */
-  private final NumberFormat secondsFormat;
-  private final NumberFormat commaFormat;
-  private static final List<DAGClient> shutdownList;
-  private final Map<String, BaseWork> workMap;
-
-  private StringBuffer diagnostics;
-
-  static {
-    shutdownList = new LinkedList<DAGClient>();
-    ShutdownHookManager.addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        TezJobMonitor.killRunningJobs();
-        try {
-          TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
-        } catch (Exception e) {
-          // ignore
-        }
-      }
-    });
-  }
-
-  public static void initShutdownHook() {
-    Preconditions.checkNotNull(shutdownList,
-        "Shutdown hook was not properly initialized");
-  }
-
-  public TezJobMonitor(Map<String, BaseWork> workMap) {
-    this.workMap = workMap;
-    console = SessionState.getConsole();
-    secondsFormat = new DecimalFormat("#0.00");
-    commaFormat = NumberFormat.getNumberInstance(Locale.US);
-    // all progress updates are written to info stream and log file. In-place updates can only be
-    // done to info stream (console)
-    out = console.getInfoStream();
-  }
-
-  /**
-   * NOTE: Use this method only if isUnixTerminal is true.
-   * Erases the current line and prints the given line.
-   * @param line - line to print
-   */
-  public void reprintLine(String line) {
-    InPlaceUpdates.reprintLine(out, line);
-    lines++;
-  }
-
-  /**
-   * NOTE: Use this method only if isUnixTerminal is true.
-   * Erases the current line and prints the given line with the specified color.
-   * @param line - line to print
-   * @param color - color for the line
-   */
-  public void reprintLineWithColorAsBold(String line, Ansi.Color color) {
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).fg(color).bold().a(line).a('\n').boldOff().reset()
-        .toString());
-    out.flush();
-    lines++;
-  }
-
-  /**
-   * NOTE: Use this method only if isUnixTerminal is true.
-   * Erases the current line and prints the given multiline. Make sure the specified line is not
-   * terminated by linebreak.
-   * @param line - line to print
-   */
-  public void reprintMultiLine(String line) {
-    int numLines = line.split("\r\n|\r|\n").length;
-    out.print(ansi().eraseLine(Ansi.Erase.ALL).a(line).a('\n').toString());
-    out.flush();
-    lines += numLines;
-  }
-
-  /**
-   * NOTE: Use this method only if isUnixTerminal is true.
-   * Repositions the cursor back to line 0.
-   */
-  public void repositionCursor() {
-    if (lines > 0) {
-      out.print(ansi().cursorUp(lines).toString());
-      out.flush();
-      lines = 0;
-    }
-  }
-
-  /**
-   * monitorExecution handles status printing, failures during execution and final status retrieval.
-   *
-   * @param dagClient client that was used to kick off the job
-   * @param conf configuration file for this operation
-   * @return int 0 - success, 1 - killed, 2 - failed
-   */
-  public int monitorExecution(final DAGClient dagClient, HiveConf conf,
-      DAG dag, Context ctx) throws InterruptedException {
-    long monitorStartTime = System.currentTimeMillis();
-    DAGStatus status = null;
-    completed = new HashSet<String>();
-    diagnostics = new StringBuffer();
-
-    boolean running = false;
-    boolean done = false;
-    boolean success = false;
-    int failedCounter = 0;
-    int rc = 0;
-    DAGStatus.State lastState = null;
-    String lastReport = null;
-    Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
-    long startTime = 0;
-    boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
-        Utilities.isPerfOrAboveLogging(conf);
-    boolean llapIoEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, false);
-
-    boolean inPlaceEligible = InPlaceUpdates.inPlaceEligible(conf);
-    synchronized(shutdownList) {
-      shutdownList.add(dagClient);
-    }
-    console.printInfo("\n");
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
-    Map<String, Progress> progressMap = null;
-    while (true) {
-
-      try {
-        if (ctx != null) {
-          ctx.checkHeartbeaterLockException();
-        }
-
-        status = dagClient.getDAGStatus(opts, checkInterval);
-        progressMap = status.getVertexProgress();
-        DAGStatus.State state = status.getState();
-
-        if (state != lastState || state == RUNNING) {
-          lastState = state;
-
-          switch (state) {
-          case SUBMITTED:
-            console.printInfo("Status: Submitted");
-            break;
-          case INITING:
-            console.printInfo("Status: Initializing");
-            startTime = System.currentTimeMillis();
-            break;
-          case RUNNING:
-            if (!running) {
-              perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
-              console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
-              startTime = System.currentTimeMillis();
-              running = true;
-            }
-
-            if (inPlaceEligible) {
-              printStatusInPlace(progressMap, startTime, false, dagClient);
-              // log the progress report to log file as well
-              lastReport = logStatus(progressMap, lastReport, console);
-            } else {
-              lastReport = printStatus(progressMap, lastReport, console);
-            }
-            break;
-          case SUCCEEDED:
-            if (!running) {
-              startTime = monitorStartTime;
-            }
-            if (inPlaceEligible) {
-              printStatusInPlace(progressMap, startTime, false, dagClient);
-              // log the progress report to log file as well
-              lastReport = logStatus(progressMap, lastReport, console);
-            } else {
-              lastReport = printStatus(progressMap, lastReport, console);
-            }
-            success = true;
-            running = false;
-            done = true;
-            break;
-          case KILLED:
-            if (!running) {
-              startTime = monitorStartTime;
-            }
-            if (inPlaceEligible) {
-              printStatusInPlace(progressMap, startTime, true, dagClient);
-              // log the progress report to log file as well
-              lastReport = logStatus(progressMap, lastReport, console);
-            }
-            console.printInfo("Status: Killed");
-            running = false;
-            done = true;
-            rc = 1;
-            break;
-          case FAILED:
-          case ERROR:
-            if (!running) {
-              startTime = monitorStartTime;
-            }
-            if (inPlaceEligible) {
-              printStatusInPlace(progressMap, startTime, true, dagClient);
-              // log the progress report to log file as well
-              lastReport = logStatus(progressMap, lastReport, console);
-            }
-            console.printError("Status: Failed");
-            running = false;
-            done = true;
-            rc = 2;
-            break;
-          }
-        }
-      } catch (Exception e) {
-        console.printInfo("Exception: " + e.getMessage());
-        boolean isInterrupted = hasInterruptedException(e);
-        if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) {
-          try {
-            console.printInfo("Killing DAG...");
-            dagClient.tryKillDAG();
-          } catch (IOException io) {
-            // best effort
-          } catch (TezException te) {
-            // best effort
-          }
-          e.printStackTrace();
-          console.printError("Execution has failed.");
-          rc = 1;
-          done = true;
-        } else {
-          console.printInfo("Retrying...");
-        }
-      } finally {
-        if (done) {
-          if (rc != 0 && status != null) {
-            for (String diag : status.getDiagnostics()) {
-              console.printError(diag);
-              diagnostics.append(diag);
-            }
-          }
-          synchronized(shutdownList) {
-            shutdownList.remove(dagClient);
-          }
-          break;
-        }
-      }
-    }
-
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
-
-    if (isProfileEnabled && success && progressMap != null) {
-
-      double duration = (System.currentTimeMillis() - startTime) / 1000.0;
-      console.printInfo("Status: DAG finished successfully in "
-          + String.format("%.2f seconds", duration));
-      console.printInfo("");
-
-      console.printInfo(QUERY_EXEC_SUMMARY_HEADER);
-      printQueryExecutionBreakDown();
-      console.printInfo(SEPARATOR);
-      console.printInfo("");
-
-      console.printInfo(TASK_SUMMARY_HEADER);
-      printDagSummary(progressMap, console, dagClient, conf, dag, inPlaceEligible);
-      if (inPlaceEligible) {
-        console.printInfo(SEPARATOR);
-      } else {
-        console.printInfo(FILE_HEADER_SEPARATOR);
-      }
-
-      if (llapIoEnabled) {
-        console.printInfo("");
-        console.printInfo(LLAP_IO_SUMMARY_HEADER);
-        printLlapIOSummary(progressMap, console, dagClient);
-        console.printInfo(SEPARATOR);
-        console.printInfo("");
-
-        console.printInfo(FS_COUNTERS_SUMMARY_HEADER);
-        printFSCountersSummary(progressMap, console, dagClient);
-      }
-
-      console.printInfo("");
-    }
-
-    return rc;
-  }
-
-  private static boolean hasInterruptedException(Throwable e) {
-    // Hadoop IPC wraps InterruptedException. GRRR.
-    while (e != null) {
-      if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
-        return true;
-      }
-      e = e.getCause();
-    }
-    return false;
-  }
-
-  /**
-   * killRunningJobs tries to terminate execution of all
-   * currently running tez queries. No guarantees, best effort only.
-   */
-  public static void killRunningJobs() {
-    synchronized (shutdownList) {
-      for (DAGClient c : shutdownList) {
-        try {
-          System.err.println("Trying to shutdown DAG");
-          c.tryKillDAG();
-        } catch (Exception e) {
-          // ignore
-        }
-      }
-    }
-  }
-
-  private static long getCounterValueByGroupName(TezCounters vertexCounters,
-      String groupNamePattern,
-      String counterName) {
-    TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
-    return (tezCounter == null) ? 0 : tezCounter.getValue();
-  }
-
-  private void printQueryExecutionBreakDown() {
-
-    /* Build the method summary header */
-    String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
-    console.printInfo(SEPARATOR);
-    reprintLineWithColorAsBold(execBreakdownHeader, Ansi.Color.CYAN);
-    console.printInfo(SEPARATOR);
-
-    // parse, analyze, optimize and compile
-    long compile = perfLogger.getEndTime(PerfLogger.COMPILE) -
-        perfLogger.getStartTime(PerfLogger.COMPILE);
-    console.printInfo(String.format(OPERATION_SUMMARY, "Compile Query",
-        secondsFormat.format(compile / 1000.0) + "s"));
-
-    // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
-    long totalDAGPrep = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG) -
-        perfLogger.getEndTime(PerfLogger.COMPILE);
-    console.printInfo(String.format(OPERATION_SUMMARY, "Prepare Plan",
-        secondsFormat.format(totalDAGPrep / 1000.0) + "s"));
-
-    // submit to accept dag (if session is closed, this will include re-opening of session time,
-    // localizing files for AM, submitting DAG)
-    long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) -
-        perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
-    console.printInfo(String.format(OPERATION_SUMMARY, "Submit Plan",
-        secondsFormat.format(submitToAccept / 1000.0) + "s"));
-
-    // accept to start dag (schedule wait time, resource wait time etc.)
-    long acceptToStart = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
-    console.printInfo(String.format(OPERATION_SUMMARY, "Start DAG",
-        secondsFormat.format(acceptToStart / 1000.0) + "s"));
-
-    // time to actually run the dag (actual dag runtime)
-    final long startToEnd;
-    if (acceptToStart == 0) {
-      startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
-    } else {
-      startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
-          perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
-    }
-    console.printInfo(String.format(OPERATION_SUMMARY, "Run DAG",
-        secondsFormat.format(startToEnd / 1000.0) + "s"));
-
-  }
-
-  private void printDagSummary(Map<String, Progress> progressMap, LogHelper console,
-      DAGClient dagClient, HiveConf conf, DAG dag, final boolean inPlaceEligible) {
-
-    /* Strings for headers and counters */
-    String hiveCountersGroup = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
-    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-    TezCounters hiveCounters = null;
-    try {
-      hiveCounters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters();
-    } catch (IOException e) {
-      // best attempt, shouldn't really kill DAG for this
-    } catch (TezException e) {
-      // best attempt, shouldn't really kill DAG for this
-    }
-
-    /* If the counters are missing there is no point trying to print progress */
-    if (hiveCounters == null) {
-      return;
-    }
-
-    /* Print the per Vertex summary */
-    if (inPlaceEligible) {
-      console.printInfo(SEPARATOR);
-      reprintLineWithColorAsBold(SUMMARY_HEADER, Ansi.Color.CYAN);
-      console.printInfo(SEPARATOR);
-    } else {
-      console.printInfo(FILE_HEADER_SEPARATOR);
-      reprintLineWithColorAsBold(FILE_HEADER, Ansi.Color.CYAN);
-      console.printInfo(FILE_HEADER_SEPARATOR);
-    }
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    Set<StatusGetOpts> statusOptions = new HashSet<StatusGetOpts>(1);
-    statusOptions.add(StatusGetOpts.GET_COUNTERS);
-    for (String vertexName : keys) {
-      Progress progress = progressMap.get(vertexName);
-      if (progress != null) {
-        final int totalTasks = progress.getTotalTaskCount();
-        final int failedTaskAttempts = progress.getFailedTaskAttemptCount();
-        final int killedTaskAttempts = progress.getKilledTaskAttemptCount();
-        final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
-        VertexStatus vertexStatus = null;
-        try {
-          vertexStatus = dagClient.getVertexStatus(vertexName, statusOptions);
-        } catch (IOException e) {
-          // best attempt, shouldn't really kill DAG for this
-        } catch (TezException e) {
-          // best attempt, shouldn't really kill DAG for this
-        }
-
-        if (vertexStatus == null) {
-          continue;
-        }
-
-        Vertex currentVertex = dag.getVertex(vertexName);
-        List<Vertex> inputVerticesList = currentVertex.getInputVertices();
-        long hiveInputRecordsFromOtherVertices = 0;
-        if (inputVerticesList.size() > 0) {
-
-          for (Vertex inputVertex : inputVerticesList) {
-            String inputVertexName = inputVertex.getName();
-            hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
-                hiveCountersGroup, String.format("%s_",
-                    ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString()) +
-                    inputVertexName.replace(" ", "_"));
-
-            hiveInputRecordsFromOtherVertices += getCounterValueByGroupName(hiveCounters,
-                hiveCountersGroup, String.format("%s_",
-                    FileSinkOperator.Counter.RECORDS_OUT.toString()) +
-                    inputVertexName.replace(" ", "_"));
-          }
-        }
-
-      /*
-       * Get the CPU & GC
-       *
-       * counters org.apache.tez.common.counters.TaskCounter
-       *  GC_TIME_MILLIS=37712
-       *  CPU_MILLISECONDS=2774230
-       */
-        final TezCounters vertexCounters = vertexStatus.getVertexCounters();
-        final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
-            TaskCounter.class.getName(),
-            TaskCounter.CPU_MILLISECONDS.name());
-
-        final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
-            TaskCounter.class.getName(),
-            TaskCounter.GC_TIME_MILLIS.name());
-
-      /*
-       * Get the HIVE counters
-       *
-       * HIVE
-       *  CREATED_FILES=1
-       *  DESERIALIZE_ERRORS=0
-       *  RECORDS_IN_Map_1=550076554
-       *  RECORDS_OUT_INTERMEDIATE_Map_1=854987
-       *  RECORDS_OUT_Reducer_2=1
-       */
-
-        final long hiveInputRecords =
-            getCounterValueByGroupName(
-                hiveCounters,
-                hiveCountersGroup,
-                String.format("%s_", MapOperator.Counter.RECORDS_IN.toString())
-                    + vertexName.replace(" ", "_"))
-                + hiveInputRecordsFromOtherVertices;
-        final long hiveOutputIntermediateRecords =
-            getCounterValueByGroupName(
-                hiveCounters,
-                hiveCountersGroup,
-                String.format("%s_", ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString())
-                    + vertexName.replace(" ", "_"));
-        final long hiveOutputRecords =
-            getCounterValueByGroupName(
-                hiveCounters,
-                hiveCountersGroup,
-                String.format("%s_", FileSinkOperator.Counter.RECORDS_OUT.toString())
-                    + vertexName.replace(" ", "_"))
-                + hiveOutputIntermediateRecords;
-
-        final String vertexExecutionStats;
-        if (inPlaceEligible) {
-          vertexExecutionStats = String.format(SUMMARY_HEADER_FORMAT,
-              vertexName,
-              secondsFormat.format((duration)),
-              commaFormat.format(cpuTimeMillis),
-              commaFormat.format(gcTimeMillis),
-              commaFormat.format(hiveInputRecords),
-              commaFormat.format(hiveOutputRecords));
-        } else {
-          vertexExecutionStats = String.format(FILE_HEADER_FORMAT,
-              vertexName,
-              totalTasks,
-              failedTaskAttempts,
-              killedTaskAttempts,
-              secondsFormat.format((duration)),
-              commaFormat.format(cpuTimeMillis),
-              commaFormat.format(gcTimeMillis),
-              commaFormat.format(hiveInputRecords),
-              commaFormat.format(hiveOutputRecords));
-        }
-        console.printInfo(vertexExecutionStats);
-      }
-    }
-  }
-
-  private void printLlapIOSummary(Map<String, Progress> progressMap, LogHelper console,
-      DAGClient dagClient) {
-    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
-    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
-    statusOptions.add(StatusGetOpts.GET_COUNTERS);
-    boolean first = false;
-    String counterGroup = LlapIOCounters.class.getName();
-    for (String vertexName : keys) {
-      // Reducers do not benefit from LLAP IO so no point in printing
-      if (vertexName.startsWith("Reducer")) {
-        continue;
-      }
-      TezCounters vertexCounters = null;
-      try {
-        vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
-            .getVertexCounters();
-      } catch (IOException e) {
-        // best attempt, shouldn't really kill DAG for this
-      } catch (TezException e) {
-        // best attempt, shouldn't really kill DAG for this
-      }
-      if (vertexCounters != null) {
-        final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
-        final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
-        final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
-        final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
-        final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
-        final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
-        final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
-        final long totalIoTime = getCounterValueByGroupName(vertexCounters,
-            counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
-
-        if (!first) {
-          console.printInfo(SEPARATOR);
-          reprintLineWithColorAsBold(LLAP_SUMMARY_HEADER, Ansi.Color.CYAN);
-          console.printInfo(SEPARATOR);
-          first = true;
-        }
-
-        String queryFragmentStats = String.format(LLAP_SUMMARY_HEADER_FORMAT,
-            vertexName,
-            selectedRowgroups,
-            metadataCacheHit,
-            metadataCacheMiss,
-            Utilities.humanReadableByteCount(cacheHitBytes),
-            Utilities.humanReadableByteCount(cacheMissBytes),
-            Utilities.humanReadableByteCount(allocatedBytes),
-            Utilities.humanReadableByteCount(allocatedUsedBytes),
-            secondsFormat.format(totalIoTime / 1000_000_000.0) + "s");
-        console.printInfo(queryFragmentStats);
-      }
-    }
-  }
-
-  private void printFSCountersSummary(Map<String, Progress> progressMap, LogHelper console,
-      DAGClient dagClient) {
-    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
-    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
-    statusOptions.add(StatusGetOpts.GET_COUNTERS);
-    // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
-    // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
-    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
-      final String scheme = statistics.getScheme().toUpperCase();
-      final String fsCountersHeader = String.format(FS_COUNTERS_HEADER_FORMAT,
-          "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
-
-      console.printInfo("");
-      reprintLineWithColorAsBold("Scheme: " + scheme, Ansi.Color.RED);
-      console.printInfo(SEPARATOR);
-      reprintLineWithColorAsBold(fsCountersHeader, Ansi.Color.CYAN);
-      console.printInfo(SEPARATOR);
-
-      for (String vertexName : keys) {
-        TezCounters vertexCounters = null;
-        try {
-          vertexCounters = dagClient.getVertexStatus(vertexName, statusOptions)
-              .getVertexCounters();
-        } catch (IOException e) {
-          // best attempt, shouldn't really kill DAG for this
-        } catch (TezException e) {
-          // best attempt, shouldn't really kill DAG for this
-        }
-        if (vertexCounters != null) {
-          final String counterGroup = FileSystemCounter.class.getName();
-          final long bytesRead = getCounterValueByGroupName(vertexCounters,
-              counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
-          final long bytesWritten = getCounterValueByGroupName(vertexCounters,
-              counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
-          final long readOps = getCounterValueByGroupName(vertexCounters,
-              counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
-          final long largeReadOps = getCounterValueByGroupName(vertexCounters,
-              counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
-          final long writeOps = getCounterValueByGroupName(vertexCounters,
-              counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
-
-          String fsCountersSummary = String.format(FS_COUNTERS_HEADER_FORMAT,
-              vertexName,
-              Utilities.humanReadableByteCount(bytesRead),
-              readOps,
-              largeReadOps,
-              Utilities.humanReadableByteCount(bytesWritten),
-              writeOps);
-          console.printInfo(fsCountersSummary);
-        }
-      }
-
-      console.printInfo(SEPARATOR);
-    }
-  }
-
-  private void printStatusInPlace(Map<String, Progress> progressMap, long startTime,
-      boolean vextexStatusFromAM, DAGClient dagClient) {
-    StringBuilder reportBuffer = new StringBuilder();
-    int sumComplete = 0;
-    int sumTotal = 0;
-
-    // position the cursor to line 0
-    repositionCursor();
-
-    // print header
-    // -------------------------------------------------------------------------------
-    //         VERTICES     STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
-    // -------------------------------------------------------------------------------
-    reprintLine(SEPARATOR);
-    reprintLineWithColorAsBold(HEADER, Ansi.Color.CYAN);
-    reprintLine(SEPARATOR);
-
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    int idx = 0;
-    int maxKeys = keys.size();
-    for (String s : keys) {
-      idx++;
-      Progress progress = progressMap.get(s);
-      final int complete = progress.getSucceededTaskCount();
-      final int total = progress.getTotalTaskCount();
-      final int running = progress.getRunningTaskCount();
-      final int failed = progress.getFailedTaskAttemptCount();
-      final int pending = progress.getTotalTaskCount() - progress.getSucceededTaskCount() -
-          progress.getRunningTaskCount();
-      final int killed = progress.getKilledTaskAttemptCount();
-
-      // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
-      // get status from AM for every refresh of the UI. Lets infer the state from task counts.
-      // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
-      VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
-
-      // INITED state
-      if (total > 0) {
-        vertexState = VertexStatus.State.INITED;
-        sumComplete += complete;
-        sumTotal += total;
-      }
-
-      // RUNNING state
-      if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
-        vertexState = VertexStatus.State.RUNNING;
-        if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-          perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-        }
-      }
-
-      // SUCCEEDED state
-      if (complete == total) {
-        vertexState = VertexStatus.State.SUCCEEDED;
-        if (!completed.contains(s)) {
-          completed.add(s);
-
-            /* We may have missed the start of the vertex
-             * due to the 3 seconds interval
-             */
-          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-          }
-
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-        }
-      }
-
-      // DAG might have been killed, lets try to get vertex state from AM before dying
-      // KILLED or FAILED state
-      if (vextexStatusFromAM) {
-        VertexStatus vertexStatus = null;
-        try {
-          vertexStatus = dagClient.getVertexStatus(s, null);
-        } catch (IOException e) {
-          // best attempt, shouldn't really kill DAG for this
-        } catch (TezException e) {
-          // best attempt, shouldn't really kill DAG for this
-        }
-        if (vertexStatus != null) {
-          vertexState = vertexStatus.getState();
-        }
-      }
-
-      // Map 1 .......... container  SUCCEEDED      7          7        0        0       0       0
-      String nameWithProgress = getNameWithProgress(s, complete, total);
-      String mode = getMode(s, workMap);
-      String vertexStr = String.format(VERTEX_FORMAT,
-          nameWithProgress,
-          mode,
-          vertexState.toString(),
-          total,
-          complete,
-          running,
-          pending,
-          failed,
-          killed);
-      reportBuffer.append(vertexStr);
-      if (idx != maxKeys) {
-        reportBuffer.append("\n");
-      }
-    }
-
-    reprintMultiLine(reportBuffer.toString());
-
-    // -------------------------------------------------------------------------------
-    // VERTICES: 03/04            [=================>>-----] 86%  ELAPSED TIME: 1.71 s
-    // -------------------------------------------------------------------------------
-    reprintLine(SEPARATOR);
-    final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
-    String footer = getFooter(keys.size(), completed.size(), progress, startTime);
-    reprintLineWithColorAsBold(footer, Ansi.Color.RED);
-    reprintLine(SEPARATOR);
-  }
-
-  private String getMode(String name, Map<String, BaseWork> workMap) {
-    String mode = "container";
-    BaseWork work = workMap.get(name);
-    if (work != null) {
-      // uber > llap > container
-      if (work.getUberMode()) {
-        mode = "uber";
-      } else if (work.getLlapMode()) {
-        mode = "llap";
-      } else {
-        mode = "container";
-      }
-    }
-    return mode;
-  }
-
-  // Map 1 ..........
-  private String getNameWithProgress(String s, int complete, int total) {
-    String result = "";
-    if (s != null) {
-      float percent = total == 0 ? 0.0f : (float) complete / (float) total;
-      // lets use the remaining space in column 1 as progress bar
-      int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
-      String trimmedVName = s;
-
-      // if the vertex name is longer than column 1 width, trim it down
-      // "Tez Merge File Work" will become "Tez Merge File.."
-      if (s.length() > COLUMN_1_WIDTH) {
-        trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
-        trimmedVName = trimmedVName + "..";
-      }
-
-      result = trimmedVName + " ";
-      int toFill = (int) (spaceRemaining * percent);
-      for (int i = 0; i < toFill; i++) {
-        result += ".";
-      }
-    }
-    return result;
-  }
-
-  // VERTICES: 03/04            [==================>>-----] 86%  ELAPSED TIME: 1.71 s
-  private String getFooter(int keySize, int completedSize, float progress, long startTime) {
-    String verticesSummary = String.format("VERTICES: %02d/%02d", completedSize, keySize);
-    String progressBar = getInPlaceProgressBar(progress);
-    final int progressPercent = (int) (progress * 100);
-    String progressStr = "" + progressPercent + "%";
-    float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
-    String elapsedTime = "ELAPSED TIME: " + secondsFormat.format(et) + " s";
-    String footer = String.format(FOOTER_FORMAT,
-        verticesSummary, progressBar, progressStr, elapsedTime);
-    return footer;
-  }
-
-  // [==================>>-----]
-  private String getInPlaceProgressBar(float percent) {
-    StringBuilder bar = new StringBuilder("[");
-    int remainingChars = progressBarChars - 4;
-    int completed = (int) (remainingChars * percent);
-    int pending = remainingChars - completed;
-    for (int i = 0; i < completed; i++) {
-      bar.append("=");
-    }
-    bar.append(">>");
-    for (int i = 0; i < pending; i++) {
-      bar.append("-");
-    }
-    bar.append("]");
-    return bar.toString();
-  }
-
-  private String printStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
-    String report = getReport(progressMap);
-    if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
-      console.printInfo(report);
-      lastPrintTime = System.currentTimeMillis();
-    }
-    return report;
-  }
-
-  private String logStatus(Map<String, Progress> progressMap, String lastReport, LogHelper console) {
-    String report = getReport(progressMap);
-    if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + printInterval) {
-      console.logInfo(report);
-      lastPrintTime = System.currentTimeMillis();
-    }
-    return report;
-  }
-
-  private String getReport(Map<String, Progress> progressMap) {
-    StringBuilder reportBuffer = new StringBuilder();
-
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    for (String s: keys) {
-      Progress progress = progressMap.get(s);
-      final int complete = progress.getSucceededTaskCount();
-      final int total = progress.getTotalTaskCount();
-      final int running = progress.getRunningTaskCount();
-      final int failed = progress.getFailedTaskAttemptCount();
-      if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", s));
-      } else {
-        if (complete == total && !completed.contains(s)) {
-          completed.add(s);
-
-          /*
-           * We may have missed the start of the vertex due to the 3 seconds interval
-           */
-          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-          }
-
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-        }
-        if(complete < total && (complete > 0 || running > 0 || failed > 0)) {
-
-          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-          }
-
-          /* vertex is started, but not complete */
-          if (failed > 0) {
-            reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
-          } else {
-            reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
-          }
-        } else {
-          /* vertex is waiting for input/slots or complete */
-          if (failed > 0) {
-            /* tasks finished but some failed */
-            reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
-          } else {
-            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
-          }
-        }
-      }
-    }
-
-    return reportBuffer.toString();
-  }
-
-  public String getDiagnostics() {
-    return diagnostics.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index f1071fa..62f65c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
-
 import java.util.Collection;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -40,9 +38,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import javax.security.auth.login.LoginException;
-
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -83,6 +79,7 @@ import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
 import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
 
 /**
  * Holds session state related to Tez
@@ -671,7 +668,7 @@ public class TezSessionState {
   }
 
   public List<LocalResource> getLocalizedResources() {
-    return new ArrayList<LocalResource>(localizedResources);
+    return new ArrayList<>(localizedResources);
   }
 
   public String getUser() {
@@ -698,4 +695,5 @@ public class TezSessionState {
       }
     } while (!ownerThread.compareAndSet(null, newName));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 7479b85..69cbe0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -79,6 +79,7 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.json.JSONObject;
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
 
 /**
  *
@@ -178,8 +179,9 @@ public class TezTask extends Task<TezWork> {
             additionalLr, inputOutputJars, inputOutputLocalResources);
 
         // finally monitor will print progress until the job is done
-        TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
-        rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
+        TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap(),dagClient, conf, dag, ctx);
+        rc = monitor.monitorExecution();
+
         if (rc != 0) {
           this.setException(new HiveException(monitor.getDiagnostics()));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
new file mode 100644
index 0000000..eccbbb6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+
+public interface Constants {
+  String SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
new file mode 100644
index 0000000..5840ad6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.*;
+
+
+class DAGSummary implements PrintSummary {
+
+  private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34;
+  private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
+
+  private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
+  private static final String FILE_HEADER = String.format(
+      FORMATTING_PATTERN,
+      "VERTICES",
+      "TOTAL_TASKS",
+      "FAILED_ATTEMPTS",
+      "KILLED_TASKS",
+      "DURATION(ms)",
+      "CPU_TIME(ms)",
+      "GC_TIME(ms)",
+      "INPUT_RECORDS",
+      "OUTPUT_RECORDS"
+  );
+
+  private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+  private final NumberFormat commaFormatter = NumberFormat.getNumberInstance(Locale.US);
+
+  private final String hiveCountersGroup;
+  private final TezCounters hiveCounters;
+
+  private Map<String, Progress> progressMap;
+  private DAGClient dagClient;
+  private DAG dag;
+  private PerfLogger perfLogger;
+
+  DAGSummary(Map<String, Progress> progressMap, HiveConf hiveConf, DAGClient dagClient,
+             DAG dag, PerfLogger perfLogger) {
+    this.progressMap = progressMap;
+    this.dagClient = dagClient;
+    this.dag = dag;
+    this.perfLogger = perfLogger;
+    this.hiveCountersGroup = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP);
+    this.hiveCounters = hiveCounters(dagClient);
+  }
+
+  private long hiveInputRecordsFromOtherVertices(String vertexName) {
+    List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices();
+    long result = 0;
+    for (Vertex inputVertex : inputVerticesList) {
+      String intermediateRecordsCounterName = formattedName(
+          ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+          inputVertex.getName()
+      );
+      String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+          inputVertex.getName());
+      result += (
+          hiveCounterValue(intermediateRecordsCounterName)
+              + hiveCounterValue(recordsOutCounterName)
+      );
+    }
+    return result;
+  }
+
+  private String formattedName(String counterName, String vertexName) {
+    return String.format("%s_", counterName) + vertexName.replace(" ", "_");
+  }
+
+  private long getCounterValueByGroupName(TezCounters counters, String pattern, String counterName) {
+    TezCounter tezCounter = counters.getGroup(pattern).findCounter(counterName);
+    return (tezCounter == null) ? 0 : tezCounter.getValue();
+  }
+
+  private long hiveCounterValue(String counterName) {
+    return getCounterValueByGroupName(hiveCounters, hiveCountersGroup, counterName);
+  }
+
+  private TezCounters hiveCounters(DAGClient dagClient) {
+    try {
+      return dagClient.getDAGStatus(EnumSet.of(StatusGetOpts.GET_COUNTERS)).getDAGCounters();
+    } catch (IOException | TezException e) {
+      // best attempt, shouldn't really kill DAG for this
+    }
+    return null;
+  }
+
+  @Override
+  public void print(SessionState.LogHelper console) {
+    console.printInfo("Task Execution Summary");
+
+  /* If the counters are missing there is no point trying to print progress */
+    if (hiveCounters == null) {
+      return;
+    }
+
+  /* Print the per Vertex summary */
+    printHeader(console);
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    for (String vertexName : keys) {
+      Progress progress = progressMap.get(vertexName);
+      if (progress == null) continue;
+
+      VertexStatus vertexStatus = vertexStatus(statusOptions, vertexName);
+      if (vertexStatus == null) {
+        continue;
+      }
+      console.printInfo(vertexSummary(vertexName, progress, vertexStatus));
+    }
+    console.printInfo(FILE_HEADER_SEPARATOR);
+  }
+
+  private String vertexSummary(String vertexName, Progress progress, VertexStatus vertexStatus) {
+  /*
+   * Get the CPU & GC
+   *
+   * counters org.apache.tez.common.counters.TaskCounter
+   *  GC_TIME_MILLIS=37712
+   *  CPU_MILLISECONDS=2774230
+   */
+    final TezCounters vertexCounters = vertexStatus.getVertexCounters();
+    final double cpuTimeMillis = getCounterValueByGroupName(vertexCounters,
+        TaskCounter.class.getName(),
+        TaskCounter.CPU_MILLISECONDS.name());
+
+    final double gcTimeMillis = getCounterValueByGroupName(vertexCounters,
+        TaskCounter.class.getName(),
+        TaskCounter.GC_TIME_MILLIS.name());
+
+    /*
+     * Get the HIVE counters
+     *
+     * HIVE
+     *  CREATED_FILES=1
+     *  DESERIALIZE_ERRORS=0
+     *  RECORDS_IN_Map_1=550076554
+     *  RECORDS_OUT_INTERMEDIATE_Map_1=854987
+     *  RECORDS_OUT_Reducer_2=1
+     */
+    final long hiveInputRecords =
+        hiveCounterValue(formattedName(MapOperator.Counter.RECORDS_IN.toString(), vertexName))
+            + hiveInputRecordsFromOtherVertices(vertexName);
+
+    final long hiveOutputRecords =
+        hiveCounterValue(formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), vertexName)) +
+            hiveCounterValue(formattedName(ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), vertexName));
+
+    final double duration = perfLogger.getDuration(PerfLogger.TEZ_RUN_VERTEX + vertexName);
+
+    return String.format(FORMATTING_PATTERN,
+        vertexName,
+        progress.getTotalTaskCount(),
+        progress.getFailedTaskAttemptCount(),
+        progress.getKilledTaskAttemptCount(),
+        secondsFormatter.format((duration)),
+        commaFormatter.format(cpuTimeMillis),
+        commaFormatter.format(gcTimeMillis),
+        commaFormatter.format(hiveInputRecords),
+        commaFormatter.format(hiveOutputRecords));
+  }
+
+  private VertexStatus vertexStatus(Set<StatusGetOpts> statusOptions, String vertexName) {
+    try {
+      return dagClient.getVertexStatus(vertexName, statusOptions);
+    } catch (IOException | TezException e) {
+      // best attempt, shouldn't really kill DAG for this
+    }
+    return null;
+  }
+
+  private void printHeader(SessionState.LogHelper console) {
+    console.printInfo(FILE_HEADER_SEPARATOR);
+    console.printInfo(FILE_HEADER);
+    console.printInfo(FILE_HEADER_SEPARATOR);
+  }
+}