You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2017/02/21 20:48:30 UTC

hive git commit: HIVE-15847 : In Progress update refreshes seem slow (Anishek Agarwal, via THejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master 7fa8e37fd -> e17a0409c


HIVE-15847 : In Progress update refreshes seem slow (Anishek Agarwal, via THejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e17a0409
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e17a0409
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e17a0409

Branch: refs/heads/master
Commit: e17a0409c920bcf75d8915047c3318d621e2ced0
Parents: 7fa8e37
Author: Anishek Agarwal <an...@gmail.com>
Authored: Tue Feb 21 12:48:19 2017 -0800
Committer: Thejas M Nair <th...@hortonworks.com>
Committed: Tue Feb 21 12:48:25 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hive/common/log/InPlaceUpdate.java   |   1 +
 .../hive/ql/exec/tez/monitoring/DAGSummary.java |  12 +-
 .../ql/exec/tez/monitoring/RenderStrategy.java  | 154 +++++++++++++++++++
 .../ql/exec/tez/monitoring/TezJobMonitor.java   | 110 ++-----------
 4 files changed, 168 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
index bfdb4fa..6db5c18 100644
--- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
+++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java
@@ -163,6 +163,7 @@ public class InPlaceUpdate {
       progressStr,
       elapsedTime);
 
+    reprintLine(SEPARATOR);
     reprintLineWithColorAsBold(footer, Ansi.Color.RED);
     reprintLine(SEPARATOR);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 5840ad6..1400be4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -26,16 +26,11 @@ import java.util.*;
 
 class DAGSummary implements PrintSummary {
 
-  private static final int FILE_HEADER_SEPARATOR_WIDTH = InPlaceUpdate.MIN_TERMINAL_WIDTH + 34;
-  private static final String FILE_HEADER_SEPARATOR = new String(new char[FILE_HEADER_SEPARATOR_WIDTH]).replace("\0", "-");
-
-  private static final String FORMATTING_PATTERN = "%10s %12s %16s %13s %14s %13s %12s %14s %15s";
+  private static final String FILE_HEADER_SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-");
+  private static final String FORMATTING_PATTERN = "%10s %17s %14s %14s %15s %16s";
   private static final String FILE_HEADER = String.format(
       FORMATTING_PATTERN,
       "VERTICES",
-      "TOTAL_TASKS",
-      "FAILED_ATTEMPTS",
-      "KILLED_TASKS",
       "DURATION(ms)",
       "CPU_TIME(ms)",
       "GC_TIME(ms)",
@@ -170,9 +165,6 @@ class DAGSummary implements PrintSummary {
 
     return String.format(FORMATTING_PATTERN,
         vertexName,
-        progress.getTotalTaskCount(),
-        progress.getFailedTaskAttemptCount(),
-        progress.getKilledTaskAttemptCount(),
         secondsFormatter.format((duration)),
         commaFormatter.format(cpuTimeMillis),
         commaFormatter.format(gcTimeMillis),

http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
new file mode 100644
index 0000000..bb9a5e7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java
@@ -0,0 +1,154 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+class RenderStrategy {
+
+  interface UpdateFunction {
+    void update(DAGStatus status, Map<String, Progress> vertexProgressMap);
+  }
+
+  private abstract static class BaseUpdateFunction implements UpdateFunction {
+    private static final int PRINT_INTERVAL = 3000;
+
+    final TezJobMonitor monitor;
+    private final PerfLogger perfLogger;
+
+    private long lastPrintTime = 0L;
+    private String lastReport = null;
+
+    BaseUpdateFunction(TezJobMonitor monitor) {
+      this.monitor = monitor;
+      perfLogger = SessionState.getPerfLogger();
+    }
+
+    @Override
+    public void update(DAGStatus status, Map<String, Progress> vertexProgressMap) {
+      renderProgress(monitor.progressMonitor(status, vertexProgressMap));
+      String report = getReport(vertexProgressMap);
+      if (showReport(report)) {
+        renderReport(report);
+        lastReport = report;
+        lastPrintTime = System.currentTimeMillis();
+      }
+    }
+
+    private boolean showReport(String report) {
+      return !report.equals(lastReport)
+          || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL;
+    }
+
+    private String getReport(Map<String, Progress> progressMap) {
+      StringWriter reportBuffer = new StringWriter();
+
+      SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+      for (String s : keys) {
+        Progress progress = progressMap.get(s);
+        final int complete = progress.getSucceededTaskCount();
+        final int total = progress.getTotalTaskCount();
+        final int running = progress.getRunningTaskCount();
+        final int failed = progress.getFailedTaskAttemptCount();
+        if (total <= 0) {
+          reportBuffer.append(String.format("%s: -/-\t", s));
+        } else {
+          if (complete == total) {
+          /*
+           * We may have missed the start of the vertex due to the 3 seconds interval
+           */
+            if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+              perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+            }
+
+            perfLogger.PerfLogEnd(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+          }
+          if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+            if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+              perfLogger.PerfLogBegin(TezJobMonitor.CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+            }
+
+          /* vertex is started, but not complete */
+            if (failed > 0) {
+              reportBuffer.append(
+                  String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+            } else {
+              reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+            }
+          } else {
+          /* vertex is waiting for input/slots or complete */
+            if (failed > 0) {
+            /* tasks finished but some failed */
+              reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+            } else {
+              reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+            }
+          }
+        }
+      }
+
+      return reportBuffer.toString();
+    }
+
+    abstract void renderProgress(ProgressMonitor progressMonitor);
+
+    abstract void renderReport(String report);
+  }
+
+  /**
+   * this adds the required progress update to the session state that is used by HS2 to send the
+   * same information to beeline client when requested.
+   */
+  static class LogToFileFunction extends BaseUpdateFunction {
+
+    LogToFileFunction(TezJobMonitor monitor) {
+      super(monitor);
+    }
+
+    @Override
+    public void renderProgress(ProgressMonitor progressMonitor) {
+      SessionState.get().updateProgressMonitor(progressMonitor);
+    }
+
+    @Override
+    public void renderReport(String report) {
+      monitor.console.printInfo(report);
+    }
+  }
+
+  /**
+   * This used when we want the progress update to printed in the same process typically used via
+   * hive-cli mode.
+   */
+  static class InPlaceUpdateFunction extends BaseUpdateFunction {
+    /**
+     * Have to use the same instance to render else the number lines printed earlier is lost and the
+     * screen will print the table again and again.
+     */
+    private final InPlaceUpdate inPlaceUpdate;
+
+    InPlaceUpdateFunction(TezJobMonitor monitor) {
+      super(monitor);
+      inPlaceUpdate = new InPlaceUpdate(SessionState.LogHelper.getInfoStream());
+    }
+
+    @Override
+    public void renderProgress(ProgressMonitor progressMonitor) {
+      inPlaceUpdate.render(progressMonitor);
+    }
+
+    @Override
+    public void renderReport(String report) {
+      monitor.console.logInfo(report);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e17a0409/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
index c0a068d..f2f97f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -47,8 +47,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
 
 import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
 
@@ -59,24 +57,18 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
  */
 public class TezJobMonitor {
 
-  private static final String CLASS_NAME = TezJobMonitor.class.getName();
+  static final String CLASS_NAME = TezJobMonitor.class.getName();
   private static final int CHECK_INTERVAL = 200;
   private static final int MAX_RETRY_INTERVAL = 2500;
-  private static final int PRINT_INTERVAL = 3000;
 
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final List<DAGClient> shutdownList;
   private final Map<String, BaseWork> workMap;
 
-  private transient LogHelper console;
+  transient LogHelper console;
 
-  private long lastPrintTime;
   private StringWriter diagnostics = new StringWriter();
 
-  interface UpdateFunction {
-    void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report);
-  }
-
   static {
     shutdownList = new LinkedList<>();
     ShutdownHookManager.addShutdownHook(new Runnable() {
@@ -102,12 +94,7 @@ public class TezJobMonitor {
   private final DAG dag;
   private final Context context;
   private long executionStartTime = 0;
-  private final UpdateFunction updateFunction;
-  /**
-   * Have to use the same instance to render else the number lines printed earlier is lost and the
-   * screen will print the table again and again.
-   */
-  private final InPlaceUpdate inPlaceUpdate;
+  private final RenderStrategy.UpdateFunction updateFunction;
 
   public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag,
                        Context ctx) {
@@ -117,29 +104,15 @@ public class TezJobMonitor {
     this.dag = dag;
     this.context = ctx;
     console = SessionState.getConsole();
-    inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream());
     updateFunction = updateFunction();
   }
 
-  private UpdateFunction updateFunction() {
-    UpdateFunction logToFileFunction = new UpdateFunction() {
-      @Override
-      public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
-        SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap));
-        console.printInfo(report);
-      }
-    };
-    UpdateFunction inPlaceUpdateFunction = new UpdateFunction() {
-      @Override
-      public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
-        inPlaceUpdate.render(progressMonitor(status, vertexProgressMap));
-        console.logInfo(report);
-      }
-    };
+  private RenderStrategy.UpdateFunction updateFunction() {
     return InPlaceUpdate.canRenderInPlace(hiveConf)
         && !SessionState.getConsole().getIsSilent()
         && !SessionState.get().isHiveServerQuery()
-        ? inPlaceUpdateFunction : logToFileFunction;
+        ? new RenderStrategy.InPlaceUpdateFunction(this)
+        : new RenderStrategy.LogToFileFunction(this);
   }
 
   private boolean isProfilingEnabled() {
@@ -163,7 +136,6 @@ public class TezJobMonitor {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
     DAGStatus.State lastState = null;
-    String lastReport = null;
     boolean running = false;
 
     while (true) {
@@ -195,13 +167,13 @@ public class TezJobMonitor {
                 this.executionStartTime = System.currentTimeMillis();
                 running = true;
               }
-              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              updateFunction.update(status, vertexProgressMap);
               break;
             case SUCCEEDED:
               if (!running) {
                 this.executionStartTime = monitorStartTime;
               }
-              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              updateFunction.update(status, vertexProgressMap);
               success = true;
               running = false;
               done = true;
@@ -210,7 +182,7 @@ public class TezJobMonitor {
               if (!running) {
                 this.executionStartTime = monitorStartTime;
               }
-              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              updateFunction.update(status, vertexProgressMap);
               console.printInfo("Status: Killed");
               running = false;
               done = true;
@@ -221,7 +193,7 @@ public class TezJobMonitor {
               if (!running) {
                 this.executionStartTime = monitorStartTime;
               }
-              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              updateFunction.update(status, vertexProgressMap);
               console.printError("Status: Failed");
               running = false;
               done = true;
@@ -323,71 +295,11 @@ public class TezJobMonitor {
     return (tezCounter == null) ? 0 : tezCounter.getValue();
   }
 
-  private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap,
-      String lastReport) {
-    String report = getReport(vertexProgressMap);
-    if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) {
-      updateFunction.update(status, vertexProgressMap, report);
-      lastPrintTime = System.currentTimeMillis();
-    }
-    return report;
-  }
-
-  private String getReport(Map<String, Progress> progressMap) {
-    StringBuilder reportBuffer = new StringBuilder();
-
-    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
-    for (String s : keys) {
-      Progress progress = progressMap.get(s);
-      final int complete = progress.getSucceededTaskCount();
-      final int total = progress.getTotalTaskCount();
-      final int running = progress.getRunningTaskCount();
-      final int failed = progress.getFailedTaskAttemptCount();
-      if (total <= 0) {
-        reportBuffer.append(String.format("%s: -/-\t", s));
-      } else {
-        if (complete == total) {
-          /*
-           * We may have missed the start of the vertex due to the 3 seconds interval
-           */
-          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-          }
-
-          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-        }
-        if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
-
-          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
-            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
-          }
-
-          /* vertex is started, but not complete */
-          if (failed > 0) {
-            reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
-          } else {
-            reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
-          }
-        } else {
-          /* vertex is waiting for input/slots or complete */
-          if (failed > 0) {
-            /* tasks finished but some failed */
-            reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
-          } else {
-            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
-          }
-        }
-      }
-    }
-
-    return reportBuffer.toString();
-  }
-
   public String getDiagnostics() {
     return diagnostics.toString();
   }
 
-  private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
+  ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
     try {
       return new TezProgressMonitor(dagClient, status, workMap, progressMap, console,
           executionStartTime);