You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 20:59:35 UTC

[67/70] [abbrv] hive git commit: HIVE-15473: Progress Bar on Beeline client (Anishek Agarwal via Thejas Nair)

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
new file mode 100644
index 0000000..0a28edd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class FSCountersSummary implements PrintSummary {
+
+  private static final String FORMATTING_PATTERN = "%10s %15s %13s %18s %18s %13s";
+  private static final String HEADER = String.format(FORMATTING_PATTERN,
+      "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
+
+  private Map<String, Progress> progressMap;
+  private DAGClient dagClient;
+
+  FSCountersSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+    this.progressMap = progressMap;
+    this.dagClient = dagClient;
+  }
+
+  @Override
+  public void print(SessionState.LogHelper console) {
+    console.printInfo("FileSystem Counters Summary");
+
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
+    // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
+    for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+      final String scheme = statistics.getScheme().toUpperCase();
+
+      console.printInfo("");
+      console.printInfo("Scheme: " + scheme);
+      console.printInfo(SEPARATOR);
+      console.printInfo(HEADER);
+      console.printInfo(SEPARATOR);
+
+      for (String vertexName : keys) {
+        TezCounters vertexCounters = vertexCounters(statusOptions, vertexName);
+        if (vertexCounters != null) {
+          console.printInfo(summary(scheme, vertexName, vertexCounters));
+        }
+      }
+
+      console.printInfo(SEPARATOR);
+    }
+  }
+
+  private String summary(String scheme, String vertexName, TezCounters vertexCounters) {
+    final String counterGroup = FileSystemCounter.class.getName();
+    final long bytesRead = getCounterValueByGroupName(vertexCounters,
+        counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
+    final long bytesWritten = getCounterValueByGroupName(vertexCounters,
+        counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
+    final long readOps = getCounterValueByGroupName(vertexCounters,
+        counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
+    final long largeReadOps = getCounterValueByGroupName(vertexCounters,
+        counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
+    final long writeOps = getCounterValueByGroupName(vertexCounters,
+        counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
+
+    return String.format(FORMATTING_PATTERN,
+        vertexName,
+        Utilities.humanReadableByteCount(bytesRead),
+        readOps,
+        largeReadOps,
+        Utilities.humanReadableByteCount(bytesWritten),
+        writeOps);
+  }
+
+  private TezCounters vertexCounters(Set<StatusGetOpts> statusOptions, String vertexName) {
+    try {
+      return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+    } catch (IOException | TezException e) {
+      // best attempt, shouldn't really kill DAG for this
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
new file mode 100644
index 0000000..81f1755
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class LLAPioSummary implements PrintSummary {
+
+  private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
+  private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
+  private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
+      "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
+      "ALLOCATION", "USED", "TOTAL_IO");
+
+
+
+  private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+  private Map<String, Progress> progressMap;
+  private DAGClient dagClient;
+  private boolean first = false;
+
+  LLAPioSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+    this.progressMap = progressMap;
+    this.dagClient = dagClient;
+  }
+
+  @Override
+  public void print(SessionState.LogHelper console) {
+    console.printInfo("");
+    console.printInfo(LLAP_IO_SUMMARY_HEADER);
+
+    SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    String counterGroup = LlapIOCounters.class.getName();
+    for (String vertexName : keys) {
+      // Reducers do not benefit from LLAP IO so no point in printing
+      if (vertexName.startsWith("Reducer")) {
+        continue;
+      }
+      TezCounters vertexCounters = vertexCounter(statusOptions, vertexName);
+      if (vertexCounters != null) {
+        if (!first) {
+          console.printInfo(SEPARATOR);
+          console.printInfo(LLAP_SUMMARY_HEADER);
+          console.printInfo(SEPARATOR);
+          first = true;
+        }
+        console.printInfo(vertexSummary(vertexName, counterGroup, vertexCounters));
+      }
+    }
+    console.printInfo(SEPARATOR);
+    console.printInfo("");
+  }
+
+  private String vertexSummary(String vertexName, String counterGroup, TezCounters vertexCounters) {
+    final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
+    final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
+    final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
+    final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
+    final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
+    final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
+    final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
+    final long totalIoTime = getCounterValueByGroupName(vertexCounters,
+        counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
+
+
+    return String.format(LLAP_SUMMARY_HEADER_FORMAT,
+        vertexName,
+        selectedRowgroups,
+        metadataCacheHit,
+        metadataCacheMiss,
+        Utilities.humanReadableByteCount(cacheHitBytes),
+        Utilities.humanReadableByteCount(cacheMissBytes),
+        Utilities.humanReadableByteCount(allocatedBytes),
+        Utilities.humanReadableByteCount(allocatedUsedBytes),
+        secondsFormatter.format(totalIoTime / 1000_000_000.0) + "s");
+  }
+
+  private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String vertexName) {
+    try {
+      return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+    } catch (IOException | TezException e) {
+      // best attempt, shouldn't really kill DAG for this
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
new file mode 100644
index 0000000..6311335
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+interface PrintSummary {
+  void print(SessionState.LogHelper console);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
new file mode 100644
index 0000000..1625ce1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.text.DecimalFormat;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+
+class QueryExecutionBreakdownSummary implements PrintSummary {
+  // Methods summary
+  private static final String OPERATION_SUMMARY = "%-35s %9s";
+  private static final String OPERATION = "OPERATION";
+  private static final String DURATION = "DURATION";
+
+
+  private DecimalFormat decimalFormat = new DecimalFormat("#0.00");
+  private PerfLogger perfLogger;
+
+  private final Long compileEndTime;
+  private final Long dagSubmitStartTime;
+  private final Long submitToRunningDuration;
+
+  QueryExecutionBreakdownSummary(PerfLogger perfLogger) {
+    this.perfLogger = perfLogger;
+    this.compileEndTime = perfLogger.getEndTime(PerfLogger.COMPILE);
+    this.dagSubmitStartTime = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
+    this.submitToRunningDuration = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+  }
+
+  private String formatNumber(long number) {
+    return decimalFormat.format(number / 1000.0) + "s";
+  }
+
+  private String format(String value, long number) {
+    return String.format(OPERATION_SUMMARY, value, formatNumber(number));
+  }
+
+  public void print(SessionState.LogHelper console) {
+    console.printInfo("Query Execution Summary");
+
+    String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
+    console.printInfo(SEPARATOR);
+    console.printInfo(execBreakdownHeader);
+    console.printInfo(SEPARATOR);
+
+    // parse, analyze, optimize and compile
+    long compile = compileEndTime - perfLogger.getStartTime(PerfLogger.COMPILE);
+    console.printInfo(format("Compile Query", compile));
+
+    // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
+    long totalDAGPrep = dagSubmitStartTime - compileEndTime;
+    console.printInfo(format("Prepare Plan", totalDAGPrep));
+
+    // submit to accept dag (if session is closed, this will include re-opening of session time,
+    // localizing files for AM, submitting DAG)
+    long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - dagSubmitStartTime;
+    console.printInfo(format("Submit Plan", submitToAccept));
+
+    // accept to start dag (schedule wait time, resource wait time etc.)
+    console.printInfo(format("Start DAG", submitToRunningDuration));
+
+    // time to actually run the dag (actual dag runtime)
+    final long startToEnd;
+    if (submitToRunningDuration == 0) {
+      startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
+    } else {
+      startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
+          perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+    }
+    console.printInfo(format("Run DAG", startToEnd));
+    console.printInfo(SEPARATOR);
+    console.printInfo("");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
new file mode 100644
index 0000000..1e54f6e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -0,0 +1,397 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+  <p>
+  http://www.apache.org/licenses/LICENSE-2.0
+  <p>
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
+
+/**
+ * TezJobMonitor keeps track of a tez job while it's being executed. It will
+ * print status to the console and retrieve final status of the job after
+ * completion.
+ */
+public class TezJobMonitor {
+
+  private static final String CLASS_NAME = TezJobMonitor.class.getName();
+  private static final int CHECK_INTERVAL = 200;
+  private static final int MAX_RETRY_INTERVAL = 2500;
+  private static final int PRINT_INTERVAL = 3000;
+
+  private final PerfLogger perfLogger = SessionState.getPerfLogger();
+  private static final List<DAGClient> shutdownList;
+  private final Map<String, BaseWork> workMap;
+
+  private transient LogHelper console;
+
+  private long lastPrintTime;
+  private StringWriter diagnostics = new StringWriter();
+
+  interface UpdateFunction {
+    void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report);
+  }
+
+  static {
+    shutdownList = new LinkedList<>();
+    ShutdownHookManager.addShutdownHook(new Runnable() {
+      @Override
+      public void run() {
+        TezJobMonitor.killRunningJobs();
+        try {
+          TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    });
+  }
+
+  public static void initShutdownHook() {
+    Preconditions.checkNotNull(shutdownList,
+      "Shutdown hook was not properly initialized");
+  }
+
+  private final DAGClient dagClient;
+  private final HiveConf hiveConf;
+  private final DAG dag;
+  private final Context context;
+  private long executionStartTime = 0;
+  private final UpdateFunction updateFunction;
+  /**
+   * Have to use the same instance to render else the number lines printed earlier is lost and the
+   * screen will print the table again and again.
+   */
+  private final InPlaceUpdate inPlaceUpdate;
+
+  public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag,
+                       Context ctx) {
+    this.workMap = workMap;
+    this.dagClient = dagClient;
+    this.hiveConf = conf;
+    this.dag = dag;
+    this.context = ctx;
+    console = SessionState.getConsole();
+    inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream());
+    updateFunction = updateFunction();
+  }
+
+  private UpdateFunction updateFunction() {
+    UpdateFunction logToFileFunction = new UpdateFunction() {
+      @Override
+      public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+        SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap));
+        console.printInfo(report);
+      }
+    };
+    UpdateFunction inPlaceUpdateFunction = new UpdateFunction() {
+      @Override
+      public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+        inPlaceUpdate.render(progressMonitor(status, vertexProgressMap));
+        console.logInfo(report);
+      }
+    };
+    return InPlaceUpdate.canRenderInPlace(hiveConf)
+        && !SessionState.getConsole().getIsSilent()
+        && !SessionState.get().isHiveServerQuery()
+        ? inPlaceUpdateFunction : logToFileFunction;
+  }
+
+  private boolean isProfilingEnabled() {
+    return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+      Utilities.isPerfOrAboveLogging(hiveConf);
+  }
+
+  public int monitorExecution() {
+    boolean done = false;
+    boolean success = false;
+    int failedCounter = 0;
+    int rc = 0;
+    DAGStatus status = null;
+    Map<String, Progress> vertexProgressMap = null;
+
+
+    long monitorStartTime = System.currentTimeMillis();
+    synchronized (shutdownList) {
+      shutdownList.add(dagClient);
+    }
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+    DAGStatus.State lastState = null;
+    String lastReport = null;
+    boolean running = false;
+
+    while (true) {
+
+      try {
+        if (context != null) {
+          context.checkHeartbeaterLockException();
+        }
+
+        status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), CHECK_INTERVAL);
+        vertexProgressMap = status.getVertexProgress();
+        DAGStatus.State state = status.getState();
+
+        if (state != lastState || state == RUNNING) {
+          lastState = state;
+
+          switch (state) {
+            case SUBMITTED:
+              console.printInfo("Status: Submitted");
+              break;
+            case INITING:
+              console.printInfo("Status: Initializing");
+              this.executionStartTime = System.currentTimeMillis();
+              break;
+            case RUNNING:
+              if (!running) {
+                perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+                console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
+                this.executionStartTime = System.currentTimeMillis();
+                running = true;
+              }
+              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              break;
+            case SUCCEEDED:
+              if (!running) {
+                this.executionStartTime = monitorStartTime;
+              }
+              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              success = true;
+              running = false;
+              done = true;
+              break;
+            case KILLED:
+              if (!running) {
+                this.executionStartTime = monitorStartTime;
+              }
+              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              console.printInfo("Status: Killed");
+              running = false;
+              done = true;
+              rc = 1;
+              break;
+            case FAILED:
+            case ERROR:
+              if (!running) {
+                this.executionStartTime = monitorStartTime;
+              }
+              lastReport = updateStatus(status, vertexProgressMap, lastReport);
+              console.printError("Status: Failed");
+              running = false;
+              done = true;
+              rc = 2;
+              break;
+          }
+        }
+      } catch (Exception e) {
+        console.printInfo("Exception: " + e.getMessage());
+        boolean isInterrupted = hasInterruptedException(e);
+        if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / CHECK_INTERVAL == 0)) {
+          try {
+            console.printInfo("Killing DAG...");
+            dagClient.tryKillDAG();
+          } catch (IOException | TezException tezException) {
+            // best effort
+          }
+          console
+              .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e));
+          rc = 1;
+          done = true;
+        } else {
+          console.printInfo("Retrying...");
+        }
+      } finally {
+        if (done) {
+          if (rc != 0 && status != null) {
+            for (String diag : status.getDiagnostics()) {
+              console.printError(diag);
+              diagnostics.append(diag);
+            }
+          }
+          synchronized (shutdownList) {
+            shutdownList.remove(dagClient);
+          }
+          break;
+        }
+      }
+    }
+
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+    printSummary(success, vertexProgressMap);
+    return rc;
+  }
+
+  private void printSummary(boolean success, Map<String, Progress> progressMap) {
+    if (isProfilingEnabled() && success && progressMap != null) {
+
+      double duration = (System.currentTimeMillis() - this.executionStartTime) / 1000.0;
+      console.printInfo("Status: DAG finished successfully in " + String.format("%.2f seconds", duration));
+      console.printInfo("");
+
+      new QueryExecutionBreakdownSummary(perfLogger).print(console);
+      new DAGSummary(progressMap, hiveConf, dagClient, dag, perfLogger).print(console);
+
+      //llap IO summary
+      if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.LLAP_IO_ENABLED, false)) {
+        new LLAPioSummary(progressMap, dagClient).print(console);
+        new FSCountersSummary(progressMap, dagClient).print(console);
+      }
+      console.printInfo("");
+    }
+  }
+
+  private static boolean hasInterruptedException(Throwable e) {
+    // Hadoop IPC wraps InterruptedException. GRRR.
+    while (e != null) {
+      if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+        return true;
+      }
+      e = e.getCause();
+    }
+    return false;
+  }
+
+  /**
+   * killRunningJobs tries to terminate execution of all
+   * currently running tez queries. No guarantees, best effort only.
+   */
+  private static void killRunningJobs() {
+    synchronized (shutdownList) {
+      for (DAGClient c : shutdownList) {
+        try {
+          System.err.println("Trying to shutdown DAG");
+          c.tryKillDAG();
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern,
+                                         String counterName) {
+    TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
+    return (tezCounter == null) ? 0 : tezCounter.getValue();
+  }
+
+  private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap,
+      String lastReport) {
+    String report = getReport(vertexProgressMap);
+    if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) {
+      updateFunction.update(status, vertexProgressMap, report);
+      lastPrintTime = System.currentTimeMillis();
+    }
+    return report;
+  }
+
+  private String getReport(Map<String, Progress> progressMap) {
+    StringBuilder reportBuffer = new StringBuilder();
+
+    SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+    for (String s : keys) {
+      Progress progress = progressMap.get(s);
+      final int complete = progress.getSucceededTaskCount();
+      final int total = progress.getTotalTaskCount();
+      final int running = progress.getRunningTaskCount();
+      final int failed = progress.getFailedTaskAttemptCount();
+      if (total <= 0) {
+        reportBuffer.append(String.format("%s: -/-\t", s));
+      } else {
+        if (complete == total) {
+          /*
+           * We may have missed the start of the vertex due to the 3 seconds interval
+           */
+          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+          }
+
+          perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+        }
+        if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+          if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+            perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+          }
+
+          /* vertex is started, but not complete */
+          if (failed > 0) {
+            reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+          } else {
+            reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+          }
+        } else {
+          /* vertex is waiting for input/slots or complete */
+          if (failed > 0) {
+            /* tasks finished but some failed */
+            reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+          } else {
+            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+          }
+        }
+      }
+    }
+
+    return reportBuffer.toString();
+  }
+
+  public String getDiagnostics() {
+    return diagnostics.toString();
+  }
+
+  private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
+    try {
+      return new TezProgressMonitor(dagClient, status, workMap, progressMap, console,
+          executionStartTime);
+    } catch (IOException | TezException e) {
+      console.printInfo("Getting  Progress Information: " + e.getMessage() + " stack trace: " +
+          ExceptionUtils.getStackTrace(e));
+    }
+    return TezProgressMonitor.NULL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
new file mode 100644
index 0000000..3475fc2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -0,0 +1,313 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.KILLED;
+
+class TezProgressMonitor implements ProgressMonitor {
+  private static final int COLUMN_1_WIDTH = 16;
+  private final Map<String, BaseWork> workMap;
+  private final SessionState.LogHelper console;
+  private final long executionStartTime;
+  private final DAGStatus status;
+  Map<String, VertexStatus> vertexStatusMap = new HashMap<>();
+  Map<String, VertexProgress> progressCountsMap = new HashMap<>();
+
+  /**
+   * Try to get most the data required from dagClient in the constructor itself so that even after
+   * the tez job has finished this object can be used for later use.s
+   */
+  TezProgressMonitor(DAGClient dagClient, DAGStatus status, Map<String, BaseWork> workMap,
+      Map<String, Progress> progressMap, SessionState.LogHelper console, long executionStartTime)
+      throws IOException, TezException {
+    this.status = status;
+    this.workMap = workMap;
+    this.console = console;
+    this.executionStartTime = executionStartTime;
+    for (Map.Entry<String, Progress> entry : progressMap.entrySet()) {
+      String vertexName = entry.getKey();
+      progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState()));
+      try {
+        vertexStatusMap.put(vertexName, dagClient.getVertexStatus(vertexName, null));
+      } catch (IOException e) {
+        // best attempt, shouldn't really kill DAG for this
+      }
+    }
+  }
+
+  public List<String> headers() {
+    return Arrays.asList(
+        "VERTICES",
+        "MODE",
+        "STATUS",
+        "TOTAL",
+        "COMPLETED",
+        "RUNNING",
+        "PENDING",
+        "FAILED",
+        "KILLED"
+    );
+  }
+
+  public List<List<String>> rows() {
+    try {
+      List<List<String>> results = new ArrayList<>();
+      SortedSet<String> keys = new TreeSet<>(progressCountsMap.keySet());
+      for (String s : keys) {
+        VertexProgress progress = progressCountsMap.get(s);
+
+        // Map 1 .......... container  SUCCEEDED      7          7        0        0       0       0
+
+        results.add(
+            Arrays.asList(
+                getNameWithProgress(s, progress.succeededTaskCount, progress.totalTaskCount),
+                getMode(s, workMap),
+                progress.vertexStatus(vertexStatusMap.get(s)),
+                progress.total(),
+                progress.completed(),
+                progress.running(),
+                progress.pending(),
+                progress.failed(),
+                progress.killed()
+            )
+        );
+      }
+      return results;
+    } catch (Exception e) {
+      console.printInfo(
+          "Getting  Progress Bar table rows failed: " + e.getMessage() + " stack trace: " + Arrays
+              .toString(e.getStackTrace())
+      );
+    }
+    return Collections.emptyList();
+  }
+
+  // -------------------------------------------------------------------------------
+  // VERTICES: 03/04            [=================>>-----] 86%  ELAPSED TIME: 1.71 s
+  // -------------------------------------------------------------------------------
+  // contains footerSummary , progressedPercentage, starTime
+
+  @Override
+  public String footerSummary() {
+    return String.format("VERTICES: %02d/%02d", completed(), progressCountsMap.keySet().size());
+  }
+
+  @Override
+  public long startTime() {
+    return executionStartTime;
+  }
+
+  @Override
+  public double progressedPercentage() {
+    int sumTotal = 0, sumComplete = 0;
+    for (String s : progressCountsMap.keySet()) {
+      VertexProgress progress = progressCountsMap.get(s);
+      final int complete = progress.succeededTaskCount;
+      final int total = progress.totalTaskCount;
+      if (total > 0) {
+        sumTotal += total;
+        sumComplete += complete;
+      }
+    }
+    return (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
+  }
+
+  @Override
+  public String executionStatus() {
+    return this.status.getState().name();
+  }
+
+  private int completed() {
+    Set<String> completed = new HashSet<>();
+    for (String s : progressCountsMap.keySet()) {
+      VertexProgress progress = progressCountsMap.get(s);
+      final int complete = progress.succeededTaskCount;
+      final int total = progress.totalTaskCount;
+      if (total > 0) {
+        if (complete == total) {
+          completed.add(s);
+        }
+      }
+    }
+    return completed.size();
+  }
+
+  // Map 1 ..........
+
+  private String getNameWithProgress(String s, int complete, int total) {
+    String result = "";
+    if (s != null) {
+      float percent = total == 0 ? 0.0f : (float) complete / (float) total;
+      // lets use the remaining space in column 1 as progress bar
+      int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+      String trimmedVName = s;
+
+      // if the vertex name is longer than column 1 width, trim it down
+      // "Tez Merge File Work" will become "Tez Merge File.."
+      if (s.length() > COLUMN_1_WIDTH) {
+        trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
+        trimmedVName = trimmedVName + "..";
+      }
+
+      result = trimmedVName + " ";
+      int toFill = (int) (spaceRemaining * percent);
+      for (int i = 0; i < toFill; i++) {
+        result += ".";
+      }
+    }
+    return result;
+  }
+
+  private String getMode(String name, Map<String, BaseWork> workMap) {
+    String mode = "container";
+    BaseWork work = workMap.get(name);
+    if (work != null) {
+      // uber > llap > container
+      if (work.getUberMode()) {
+        mode = "uber";
+      } else if (work.getLlapMode()) {
+        mode = "llap";
+      } else {
+        mode = "container";
+      }
+    }
+    return mode;
+  }
+
+  static class VertexProgress {
+    private final int totalTaskCount;
+    private final int succeededTaskCount;
+    private final int failedTaskAttemptCount;
+    private final long killedTaskAttemptCount;
+    private final int runningTaskCount;
+    private final DAGStatus.State dagState;
+
+    VertexProgress(Progress progress, DAGStatus.State dagState) {
+      this(progress.getTotalTaskCount(), progress.getSucceededTaskCount(),
+          progress.getFailedTaskAttemptCount(), progress.getKilledTaskAttemptCount(),
+          progress.getRunningTaskCount(), dagState);
+    }
+
+    VertexProgress(int totalTaskCount, int succeededTaskCount, int failedTaskAttemptCount,
+        int killedTaskAttemptCount, int runningTaskCount, DAGStatus.State dagState) {
+      this.totalTaskCount = totalTaskCount;
+      this.succeededTaskCount = succeededTaskCount;
+      this.failedTaskAttemptCount = failedTaskAttemptCount;
+      this.killedTaskAttemptCount = killedTaskAttemptCount;
+      this.runningTaskCount = runningTaskCount;
+      this.dagState = dagState;
+    }
+
+    boolean isRunning() {
+      return succeededTaskCount < totalTaskCount && (succeededTaskCount > 0 || runningTaskCount > 0
+          || failedTaskAttemptCount > 0);
+    }
+
+    String vertexStatus(VertexStatus vertexStatus) {
+      // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
+      // get status from AM for every refresh of the UI. Lets infer the state from task counts.
+      // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
+      VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
+      if (totalTaskCount > 0) {
+        vertexState = VertexStatus.State.INITED;
+      }
+
+      // RUNNING state
+      if (isRunning()) {
+        vertexState = VertexStatus.State.RUNNING;
+      }
+
+      // SUCCEEDED state
+      if (succeededTaskCount == totalTaskCount) {
+        vertexState = VertexStatus.State.SUCCEEDED;
+      }
+
+      // DAG might have been killed, lets try to get vertex state from AM before dying
+      // KILLED or FAILED state
+      if (dagState == KILLED) {
+        if (vertexStatus != null) {
+          vertexState = vertexStatus.getState();
+        }
+      }
+      return vertexState.toString();
+    }
+
+    //    "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"
+
+    String total() {
+      return String.valueOf(totalTaskCount);
+    }
+
+    String completed() {
+      return String.valueOf(succeededTaskCount);
+    }
+
+    String running() {
+      return String.valueOf(runningTaskCount);
+    }
+
+    String pending() {
+      return String.valueOf(totalTaskCount - succeededTaskCount - runningTaskCount);
+    }
+
+    String failed() {
+      return String.valueOf(failedTaskAttemptCount);
+    }
+
+    String killed() {
+      return String.valueOf(killedTaskAttemptCount);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o)
+        return true;
+      if (o == null || getClass() != o.getClass())
+        return false;
+
+      VertexProgress that = (VertexProgress) o;
+
+      if (totalTaskCount != that.totalTaskCount)
+        return false;
+      if (succeededTaskCount != that.succeededTaskCount)
+        return false;
+      if (failedTaskAttemptCount != that.failedTaskAttemptCount)
+        return false;
+      if (killedTaskAttemptCount != that.killedTaskAttemptCount)
+        return false;
+      if (runningTaskCount != that.runningTaskCount)
+        return false;
+      return dagState == that.dagState;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = totalTaskCount;
+      result = 31 * result + succeededTaskCount;
+      result = 31 * result + failedTaskAttemptCount;
+      result = 31 * result + (int) (killedTaskAttemptCount ^ (killedTaskAttemptCount >>> 32));
+      result = 31 * result + runningTaskCount;
+      result = 31 * result + dagState.hashCode();
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c5b3517..ed854bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -26,8 +26,7 @@ import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
 import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
 import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
 import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -127,11 +126,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1898,7 +1897,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     final AtomicInteger partitionsLoaded = new AtomicInteger(0);
 
     final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
-        && InPlaceUpdates.inPlaceEligible(conf);
+        && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
     final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
     final SessionState parentSession = SessionState.get();
 
@@ -1926,9 +1925,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
               if (inPlaceEligible) {
                 synchronized (ps) {
-                  InPlaceUpdates.rePositionCursor(ps);
+                  InPlaceUpdate.rePositionCursor(ps);
                   partitionsLoaded.incrementAndGet();
-                  InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+                  InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
                       + partsToLoad + " partitions.");
                 }
               }

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index d607f61..3e01e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -185,6 +186,7 @@ public class SessionState {
   private HiveAuthorizationProvider authorizer;
 
   private HiveAuthorizer authorizerV2;
+  private volatile ProgressMonitor progressMonitor;
 
   public enum AuthorizationMode{V1, V2};
 
@@ -1564,6 +1566,7 @@ public class SessionState {
       // removes the threadlocal variables, closes underlying HMS connection
       Hive.closeCurrent();
     }
+    progressMonitor = null;
   }
 
   private void unCacheDataNucleusClassLoaders() {
@@ -1744,6 +1747,15 @@ public class SessionState {
   public String getReloadableAuxJars() {
     return StringUtils.join(preReloadableAuxJars, ',');
   }
+
+  public void updateProgressMonitor(ProgressMonitor progressMonitor) {
+    this.progressMonitor = progressMonitor;
+  }
+
+  public ProgressMonitor getProgressMonitor() {
+    return progressMonitor;
+  }
+
 }
 
 class ResourceMaps {

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
new file mode 100644
index 0000000..648d625
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTezProgressMonitor {
+
+  private static final String REDUCER = "Reducer";
+  private static final String MAPPER = "Mapper";
+  @Mock
+  private DAGClient dagClient;
+  @Mock
+  private SessionState.LogHelper console;
+  @Mock
+  private DAGStatus dagStatus;
+  @Mock
+  private Progress mapperProgress;
+  @Mock
+  private Progress reducerProgress;
+  @Mock
+  private VertexStatus succeeded;
+  @Mock
+  private VertexStatus running;
+
+  private Map<String, Progress> progressMap() {
+    return new HashMap<String, Progress>() {{
+      put(MAPPER, setup(mapperProgress, 2, 1, 3, 4, 5));
+      put(REDUCER, setup(reducerProgress, 3, 2, 1, 0, 1));
+    }};
+  }
+
+  private Progress setup(Progress progressMock, int total, int succeeded, int failedAttempt,
+      int killedAttempt, int running) {
+    when(progressMock.getTotalTaskCount()).thenReturn(total);
+    when(progressMock.getSucceededTaskCount()).thenReturn(succeeded);
+    when(progressMock.getFailedTaskAttemptCount()).thenReturn(failedAttempt);
+    when(progressMock.getKilledTaskAttemptCount()).thenReturn(killedAttempt);
+    when(progressMock.getRunningTaskCount()).thenReturn(running);
+    return progressMock;
+  }
+
+  @Test
+  public void setupInternalStateOnObjectCreation() throws IOException, TezException {
+    when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING);
+    when(dagClient.getVertexStatus(eq(MAPPER), anySet())).thenReturn(succeeded);
+    when(dagClient.getVertexStatus(eq(REDUCER), anySet())).thenReturn(running);
+
+    TezProgressMonitor monitor =
+        new TezProgressMonitor(dagClient, dagStatus, new HashMap<String, BaseWork>(), progressMap(), console,
+            Long.MAX_VALUE);
+
+    verify(dagClient).getVertexStatus(eq(MAPPER), isNull(Set.class));
+    verify(dagClient).getVertexStatus(eq(REDUCER), isNull(Set.class));
+    verifyNoMoreInteractions(dagClient);
+
+    assertThat(monitor.vertexStatusMap.keySet(), hasItems(MAPPER, REDUCER));
+    assertThat(monitor.vertexStatusMap.get(MAPPER), is(sameInstance(succeeded)));
+    assertThat(monitor.vertexStatusMap.get(REDUCER), is(sameInstance(running)));
+
+    assertThat(monitor.progressCountsMap.keySet(), hasItems(MAPPER, REDUCER));
+    TezProgressMonitor.VertexProgress expectedMapperState =
+        new TezProgressMonitor.VertexProgress(2, 1, 3, 4, 5, DAGStatus.State.RUNNING);
+    assertThat(monitor.progressCountsMap.get(MAPPER), is(equalTo(expectedMapperState)));
+
+    TezProgressMonitor.VertexProgress expectedReducerState =
+        new TezProgressMonitor.VertexProgress(3, 2, 1, 0, 1, DAGStatus.State.RUNNING);
+    assertThat(monitor.progressCountsMap.get(REDUCER), is(equalTo(expectedReducerState)));
+
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/if/TCLIService.thrift
----------------------------------------------------------------------
diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift
index a4fa7b0..824b049 100644
--- a/service-rpc/if/TCLIService.thrift
+++ b/service-rpc/if/TCLIService.thrift
@@ -63,6 +63,9 @@ enum TProtocolVersion {
 
   // V9 adds support for serializing ResultSets in SerDe
   HIVE_CLI_SERVICE_PROTOCOL_V9
+
+  // V10 adds support for in place updates via GetOperationStatus
+  HIVE_CLI_SERVICE_PROTOCOL_V10
 }
 
 enum TTypeId {
@@ -559,7 +562,7 @@ struct TOperationHandle {
 // which operations may be executed.
 struct TOpenSessionReq {
   // The version of the HiveServer2 protocol that the client is using.
-  1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+  1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
 
   // Username and password for authentication.
   // Depending on the authentication scheme being used,
@@ -578,7 +581,7 @@ struct TOpenSessionResp {
   1: required TStatus status
 
   // The protocol version that the server is using.
-  2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+  2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
 
   // Session Handle
   3: optional TSessionHandle sessionHandle
@@ -1019,6 +1022,8 @@ struct TGetCrossReferenceResp {
 struct TGetOperationStatusReq {
   // Session to run this request against
   1: required TOperationHandle operationHandle
+  // optional arguments to get progress information
+  2: optional bool getProgressUpdate
 }
 
 struct TGetOperationStatusResp {
@@ -1047,6 +1052,8 @@ struct TGetOperationStatusResp {
   // If the operation has the result
   9: optional bool hasResultSet
 
+  10: optional TProgressUpdateResp progressUpdateResponse
+
 }
 
 
@@ -1202,6 +1209,21 @@ struct TRenewDelegationTokenResp {
   1: required TStatus status
 }
 
+enum TJobExecutionStatus {
+    IN_PROGRESS,
+    COMPLETE,
+    NOT_AVAILABLE
+}
+
+struct TProgressUpdateResp {
+  1: required list<string> headerNames
+  2: required list<list<string>> rows
+  3: required double progressedPercentage
+  4: required TJobExecutionStatus status
+  5: required string footerSummary
+  6: required i64 startTime
+}
+
 service TCLIService {
 
   TOpenSessionResp OpenSession(1:TOpenSessionReq req);

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
index 2f460e8..0a17e0e 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
@@ -269,6 +269,18 @@ const char* _kTFetchOrientationNames[] = {
 };
 const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTFetchOrientationValues, _kTFetchOrientationNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
 
+int _kTJobExecutionStatusValues[] = {
+  TJobExecutionStatus::IN_PROGRESS,
+  TJobExecutionStatus::COMPLETE,
+  TJobExecutionStatus::NOT_AVAILABLE
+};
+const char* _kTJobExecutionStatusNames[] = {
+  "IN_PROGRESS",
+  "COMPLETE",
+  "NOT_AVAILABLE"
+};
+const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kTJobExecutionStatusValues, _kTJobExecutionStatusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
 
 TTypeQualifierValue::~TTypeQualifierValue() throw() {
 }
@@ -8174,6 +8186,11 @@ void TGetOperationStatusReq::__set_operationHandle(const TOperationHandle& val)
   this->operationHandle = val;
 }
 
+void TGetOperationStatusReq::__set_getProgressUpdate(const bool val) {
+  this->getProgressUpdate = val;
+__isset.getProgressUpdate = true;
+}
+
 uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* iprot) {
 
   apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8204,6 +8221,14 @@ uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* ipr
           xfer += iprot->skip(ftype);
         }
         break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_BOOL) {
+          xfer += iprot->readBool(this->getProgressUpdate);
+          this->__isset.getProgressUpdate = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -8227,6 +8252,11 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
   xfer += this->operationHandle.write(oprot);
   xfer += oprot->writeFieldEnd();
 
+  if (this->__isset.getProgressUpdate) {
+    xfer += oprot->writeFieldBegin("getProgressUpdate", ::apache::thrift::protocol::T_BOOL, 2);
+    xfer += oprot->writeBool(this->getProgressUpdate);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -8235,19 +8265,26 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
 void swap(TGetOperationStatusReq &a, TGetOperationStatusReq &b) {
   using ::std::swap;
   swap(a.operationHandle, b.operationHandle);
+  swap(a.getProgressUpdate, b.getProgressUpdate);
+  swap(a.__isset, b.__isset);
 }
 
 TGetOperationStatusReq::TGetOperationStatusReq(const TGetOperationStatusReq& other268) {
   operationHandle = other268.operationHandle;
+  getProgressUpdate = other268.getProgressUpdate;
+  __isset = other268.__isset;
 }
 TGetOperationStatusReq& TGetOperationStatusReq::operator=(const TGetOperationStatusReq& other269) {
   operationHandle = other269.operationHandle;
+  getProgressUpdate = other269.getProgressUpdate;
+  __isset = other269.__isset;
   return *this;
 }
 void TGetOperationStatusReq::printTo(std::ostream& out) const {
   using ::apache::thrift::to_string;
   out << "TGetOperationStatusReq(";
   out << "operationHandle=" << to_string(operationHandle);
+  out << ", " << "getProgressUpdate="; (__isset.getProgressUpdate ? (out << to_string(getProgressUpdate)) : (out << "<null>"));
   out << ")";
 }
 
@@ -8300,6 +8337,11 @@ void TGetOperationStatusResp::__set_hasResultSet(const bool val) {
 __isset.hasResultSet = true;
 }
 
+void TGetOperationStatusResp::__set_progressUpdateResponse(const TProgressUpdateResp& val) {
+  this->progressUpdateResponse = val;
+__isset.progressUpdateResponse = true;
+}
+
 uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) {
 
   apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8396,6 +8438,14 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip
           xfer += iprot->skip(ftype);
         }
         break;
+      case 10:
+        if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+          xfer += this->progressUpdateResponse.read(iprot);
+          this->__isset.progressUpdateResponse = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -8459,6 +8509,11 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o
     xfer += oprot->writeBool(this->hasResultSet);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.progressUpdateResponse) {
+    xfer += oprot->writeFieldBegin("progressUpdateResponse", ::apache::thrift::protocol::T_STRUCT, 10);
+    xfer += this->progressUpdateResponse.write(oprot);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -8475,6 +8530,7 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) {
   swap(a.operationStarted, b.operationStarted);
   swap(a.operationCompleted, b.operationCompleted);
   swap(a.hasResultSet, b.hasResultSet);
+  swap(a.progressUpdateResponse, b.progressUpdateResponse);
   swap(a.__isset, b.__isset);
 }
 
@@ -8488,6 +8544,7 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp&
   operationStarted = other271.operationStarted;
   operationCompleted = other271.operationCompleted;
   hasResultSet = other271.hasResultSet;
+  progressUpdateResponse = other271.progressUpdateResponse;
   __isset = other271.__isset;
 }
 TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other272) {
@@ -8500,6 +8557,7 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS
   operationStarted = other272.operationStarted;
   operationCompleted = other272.operationCompleted;
   hasResultSet = other272.hasResultSet;
+  progressUpdateResponse = other272.progressUpdateResponse;
   __isset = other272.__isset;
   return *this;
 }
@@ -8515,6 +8573,7 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const {
   out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "<null>"));
   out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "<null>"));
   out << ", " << "hasResultSet="; (__isset.hasResultSet ? (out << to_string(hasResultSet)) : (out << "<null>"));
+  out << ", " << "progressUpdateResponse="; (__isset.progressUpdateResponse ? (out << to_string(progressUpdateResponse)) : (out << "<null>"));
   out << ")";
 }
 
@@ -9984,4 +10043,267 @@ void TRenewDelegationTokenResp::printTo(std::ostream& out) const {
   out << ")";
 }
 
+
+TProgressUpdateResp::~TProgressUpdateResp() throw() {
+}
+
+
+void TProgressUpdateResp::__set_headerNames(const std::vector<std::string> & val) {
+  this->headerNames = val;
+}
+
+void TProgressUpdateResp::__set_rows(const std::vector<std::vector<std::string> > & val) {
+  this->rows = val;
+}
+
+void TProgressUpdateResp::__set_progressedPercentage(const double val) {
+  this->progressedPercentage = val;
+}
+
+void TProgressUpdateResp::__set_status(const TJobExecutionStatus::type val) {
+  this->status = val;
+}
+
+void TProgressUpdateResp::__set_footerSummary(const std::string& val) {
+  this->footerSummary = val;
+}
+
+void TProgressUpdateResp::__set_startTime(const int64_t val) {
+  this->startTime = val;
+}
+
+uint32_t TProgressUpdateResp::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+  apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+  uint32_t xfer = 0;
+  std::string fname;
+  ::apache::thrift::protocol::TType ftype;
+  int16_t fid;
+
+  xfer += iprot->readStructBegin(fname);
+
+  using ::apache::thrift::protocol::TProtocolException;
+
+  bool isset_headerNames = false;
+  bool isset_rows = false;
+  bool isset_progressedPercentage = false;
+  bool isset_status = false;
+  bool isset_footerSummary = false;
+  bool isset_startTime = false;
+
+  while (true)
+  {
+    xfer += iprot->readFieldBegin(fname, ftype, fid);
+    if (ftype == ::apache::thrift::protocol::T_STOP) {
+      break;
+    }
+    switch (fid)
+    {
+      case 1:
+        if (ftype == ::apache::thrift::protocol::T_LIST) {
+          {
+            this->headerNames.clear();
+            uint32_t _size302;
+            ::apache::thrift::protocol::TType _etype305;
+            xfer += iprot->readListBegin(_etype305, _size302);
+            this->headerNames.resize(_size302);
+            uint32_t _i306;
+            for (_i306 = 0; _i306 < _size302; ++_i306)
+            {
+              xfer += iprot->readString(this->headerNames[_i306]);
+            }
+            xfer += iprot->readListEnd();
+          }
+          isset_headerNames = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 2:
+        if (ftype == ::apache::thrift::protocol::T_LIST) {
+          {
+            this->rows.clear();
+            uint32_t _size307;
+            ::apache::thrift::protocol::TType _etype310;
+            xfer += iprot->readListBegin(_etype310, _size307);
+            this->rows.resize(_size307);
+            uint32_t _i311;
+            for (_i311 = 0; _i311 < _size307; ++_i311)
+            {
+              {
+                this->rows[_i311].clear();
+                uint32_t _size312;
+                ::apache::thrift::protocol::TType _etype315;
+                xfer += iprot->readListBegin(_etype315, _size312);
+                this->rows[_i311].resize(_size312);
+                uint32_t _i316;
+                for (_i316 = 0; _i316 < _size312; ++_i316)
+                {
+                  xfer += iprot->readString(this->rows[_i311][_i316]);
+                }
+                xfer += iprot->readListEnd();
+              }
+            }
+            xfer += iprot->readListEnd();
+          }
+          isset_rows = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 3:
+        if (ftype == ::apache::thrift::protocol::T_DOUBLE) {
+          xfer += iprot->readDouble(this->progressedPercentage);
+          isset_progressedPercentage = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 4:
+        if (ftype == ::apache::thrift::protocol::T_I32) {
+          int32_t ecast317;
+          xfer += iprot->readI32(ecast317);
+          this->status = (TJobExecutionStatus::type)ecast317;
+          isset_status = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 5:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->footerSummary);
+          isset_footerSummary = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      case 6:
+        if (ftype == ::apache::thrift::protocol::T_I64) {
+          xfer += iprot->readI64(this->startTime);
+          isset_startTime = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
+      default:
+        xfer += iprot->skip(ftype);
+        break;
+    }
+    xfer += iprot->readFieldEnd();
+  }
+
+  xfer += iprot->readStructEnd();
+
+  if (!isset_headerNames)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_rows)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_progressedPercentage)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_status)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_footerSummary)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_startTime)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
+  return xfer;
+}
+
+uint32_t TProgressUpdateResp::write(::apache::thrift::protocol::TProtocol* oprot) const {
+  uint32_t xfer = 0;
+  apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+  xfer += oprot->writeStructBegin("TProgressUpdateResp");
+
+  xfer += oprot->writeFieldBegin("headerNames", ::apache::thrift::protocol::T_LIST, 1);
+  {
+    xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->headerNames.size()));
+    std::vector<std::string> ::const_iterator _iter318;
+    for (_iter318 = this->headerNames.begin(); _iter318 != this->headerNames.end(); ++_iter318)
+    {
+      xfer += oprot->writeString((*_iter318));
+    }
+    xfer += oprot->writeListEnd();
+  }
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("rows", ::apache::thrift::protocol::T_LIST, 2);
+  {
+    xfer += oprot->writeListBegin(::apache::thrift::protocol::T_LIST, static_cast<uint32_t>(this->rows.size()));
+    std::vector<std::vector<std::string> > ::const_iterator _iter319;
+    for (_iter319 = this->rows.begin(); _iter319 != this->rows.end(); ++_iter319)
+    {
+      {
+        xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>((*_iter319).size()));
+        std::vector<std::string> ::const_iterator _iter320;
+        for (_iter320 = (*_iter319).begin(); _iter320 != (*_iter319).end(); ++_iter320)
+        {
+          xfer += oprot->writeString((*_iter320));
+        }
+        xfer += oprot->writeListEnd();
+      }
+    }
+    xfer += oprot->writeListEnd();
+  }
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("progressedPercentage", ::apache::thrift::protocol::T_DOUBLE, 3);
+  xfer += oprot->writeDouble(this->progressedPercentage);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("status", ::apache::thrift::protocol::T_I32, 4);
+  xfer += oprot->writeI32((int32_t)this->status);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("footerSummary", ::apache::thrift::protocol::T_STRING, 5);
+  xfer += oprot->writeString(this->footerSummary);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldBegin("startTime", ::apache::thrift::protocol::T_I64, 6);
+  xfer += oprot->writeI64(this->startTime);
+  xfer += oprot->writeFieldEnd();
+
+  xfer += oprot->writeFieldStop();
+  xfer += oprot->writeStructEnd();
+  return xfer;
+}
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b) {
+  using ::std::swap;
+  swap(a.headerNames, b.headerNames);
+  swap(a.rows, b.rows);
+  swap(a.progressedPercentage, b.progressedPercentage);
+  swap(a.status, b.status);
+  swap(a.footerSummary, b.footerSummary);
+  swap(a.startTime, b.startTime);
+}
+
+TProgressUpdateResp::TProgressUpdateResp(const TProgressUpdateResp& other321) {
+  headerNames = other321.headerNames;
+  rows = other321.rows;
+  progressedPercentage = other321.progressedPercentage;
+  status = other321.status;
+  footerSummary = other321.footerSummary;
+  startTime = other321.startTime;
+}
+TProgressUpdateResp& TProgressUpdateResp::operator=(const TProgressUpdateResp& other322) {
+  headerNames = other322.headerNames;
+  rows = other322.rows;
+  progressedPercentage = other322.progressedPercentage;
+  status = other322.status;
+  footerSummary = other322.footerSummary;
+  startTime = other322.startTime;
+  return *this;
+}
+void TProgressUpdateResp::printTo(std::ostream& out) const {
+  using ::apache::thrift::to_string;
+  out << "TProgressUpdateResp(";
+  out << "headerNames=" << to_string(headerNames);
+  out << ", " << "rows=" << to_string(rows);
+  out << ", " << "progressedPercentage=" << to_string(progressedPercentage);
+  out << ", " << "status=" << to_string(status);
+  out << ", " << "footerSummary=" << to_string(footerSummary);
+  out << ", " << "startTime=" << to_string(startTime);
+  out << ")";
+}
+
 }}}}} // namespace

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
index b249544..6c2bb34 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
@@ -175,6 +175,16 @@ struct TFetchOrientation {
 
 extern const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES;
 
+struct TJobExecutionStatus {
+  enum type {
+    IN_PROGRESS = 0,
+    COMPLETE = 1,
+    NOT_AVAILABLE = 2
+  };
+};
+
+extern const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES;
+
 typedef int32_t TTypeEntryPtr;
 
 typedef std::string TIdentifier;
@@ -339,6 +349,8 @@ class TRenewDelegationTokenReq;
 
 class TRenewDelegationTokenResp;
 
+class TProgressUpdateResp;
+
 typedef struct _TTypeQualifierValue__isset {
   _TTypeQualifierValue__isset() : i32Value(false), stringValue(false) {}
   bool i32Value :1;
@@ -3669,24 +3681,37 @@ inline std::ostream& operator<<(std::ostream& out, const TGetCrossReferenceResp&
   return out;
 }
 
+typedef struct _TGetOperationStatusReq__isset {
+  _TGetOperationStatusReq__isset() : getProgressUpdate(false) {}
+  bool getProgressUpdate :1;
+} _TGetOperationStatusReq__isset;
 
 class TGetOperationStatusReq {
  public:
 
   TGetOperationStatusReq(const TGetOperationStatusReq&);
   TGetOperationStatusReq& operator=(const TGetOperationStatusReq&);
-  TGetOperationStatusReq() {
+  TGetOperationStatusReq() : getProgressUpdate(0) {
   }
 
   virtual ~TGetOperationStatusReq() throw();
   TOperationHandle operationHandle;
+  bool getProgressUpdate;
+
+  _TGetOperationStatusReq__isset __isset;
 
   void __set_operationHandle(const TOperationHandle& val);
 
+  void __set_getProgressUpdate(const bool val);
+
   bool operator == (const TGetOperationStatusReq & rhs) const
   {
     if (!(operationHandle == rhs.operationHandle))
       return false;
+    if (__isset.getProgressUpdate != rhs.__isset.getProgressUpdate)
+      return false;
+    else if (__isset.getProgressUpdate && !(getProgressUpdate == rhs.getProgressUpdate))
+      return false;
     return true;
   }
   bool operator != (const TGetOperationStatusReq &rhs) const {
@@ -3710,7 +3735,7 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq&
 }
 
 typedef struct _TGetOperationStatusResp__isset {
-  _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false) {}
+  _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false) {}
   bool operationState :1;
   bool sqlState :1;
   bool errorCode :1;
@@ -3719,6 +3744,7 @@ typedef struct _TGetOperationStatusResp__isset {
   bool operationStarted :1;
   bool operationCompleted :1;
   bool hasResultSet :1;
+  bool progressUpdateResponse :1;
 } _TGetOperationStatusResp__isset;
 
 class TGetOperationStatusResp {
@@ -3739,6 +3765,7 @@ class TGetOperationStatusResp {
   int64_t operationStarted;
   int64_t operationCompleted;
   bool hasResultSet;
+  TProgressUpdateResp progressUpdateResponse;
 
   _TGetOperationStatusResp__isset __isset;
 
@@ -3760,6 +3787,8 @@ class TGetOperationStatusResp {
 
   void __set_hasResultSet(const bool val);
 
+  void __set_progressUpdateResponse(const TProgressUpdateResp& val);
+
   bool operator == (const TGetOperationStatusResp & rhs) const
   {
     if (!(status == rhs.status))
@@ -3796,6 +3825,10 @@ class TGetOperationStatusResp {
       return false;
     else if (__isset.hasResultSet && !(hasResultSet == rhs.hasResultSet))
       return false;
+    if (__isset.progressUpdateResponse != rhs.__isset.progressUpdateResponse)
+      return false;
+    else if (__isset.progressUpdateResponse && !(progressUpdateResponse == rhs.progressUpdateResponse))
+      return false;
     return true;
   }
   bool operator != (const TGetOperationStatusResp &rhs) const {
@@ -4470,6 +4503,71 @@ inline std::ostream& operator<<(std::ostream& out, const TRenewDelegationTokenRe
   return out;
 }
 
+
+class TProgressUpdateResp {
+ public:
+
+  TProgressUpdateResp(const TProgressUpdateResp&);
+  TProgressUpdateResp& operator=(const TProgressUpdateResp&);
+  TProgressUpdateResp() : progressedPercentage(0), status((TJobExecutionStatus::type)0), footerSummary(), startTime(0) {
+  }
+
+  virtual ~TProgressUpdateResp() throw();
+  std::vector<std::string>  headerNames;
+  std::vector<std::vector<std::string> >  rows;
+  double progressedPercentage;
+  TJobExecutionStatus::type status;
+  std::string footerSummary;
+  int64_t startTime;
+
+  void __set_headerNames(const std::vector<std::string> & val);
+
+  void __set_rows(const std::vector<std::vector<std::string> > & val);
+
+  void __set_progressedPercentage(const double val);
+
+  void __set_status(const TJobExecutionStatus::type val);
+
+  void __set_footerSummary(const std::string& val);
+
+  void __set_startTime(const int64_t val);
+
+  bool operator == (const TProgressUpdateResp & rhs) const
+  {
+    if (!(headerNames == rhs.headerNames))
+      return false;
+    if (!(rows == rhs.rows))
+      return false;
+    if (!(progressedPercentage == rhs.progressedPercentage))
+      return false;
+    if (!(status == rhs.status))
+      return false;
+    if (!(footerSummary == rhs.footerSummary))
+      return false;
+    if (!(startTime == rhs.startTime))
+      return false;
+    return true;
+  }
+  bool operator != (const TProgressUpdateResp &rhs) const {
+    return !(*this == rhs);
+  }
+
+  bool operator < (const TProgressUpdateResp & ) const;
+
+  uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+  uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+  virtual void printTo(std::ostream& out) const;
+};
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b);
+
+inline std::ostream& operator<<(std::ostream& out, const TProgressUpdateResp& obj)
+{
+  obj.printTo(out);
+  return out;
+}
+
 }}}}} // namespace
 
 #endif

http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
index 84c64cd..af31ce2 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
@@ -39,6 +39,7 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TGetOperationStatusReq");
 
   private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+  private static final org.apache.thrift.protocol.TField GET_PROGRESS_UPDATE_FIELD_DESC = new org.apache.thrift.protocol.TField("getProgressUpdate", org.apache.thrift.protocol.TType.BOOL, (short)2);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -47,10 +48,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
   }
 
   private TOperationHandle operationHandle; // required
+  private boolean getProgressUpdate; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    OPERATION_HANDLE((short)1, "operationHandle");
+    OPERATION_HANDLE((short)1, "operationHandle"),
+    GET_PROGRESS_UPDATE((short)2, "getProgressUpdate");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -67,6 +70,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
       switch(fieldId) {
         case 1: // OPERATION_HANDLE
           return OPERATION_HANDLE;
+        case 2: // GET_PROGRESS_UPDATE
+          return GET_PROGRESS_UPDATE;
         default:
           return null;
       }
@@ -107,11 +112,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
   }
 
   // isset id assignments
+  private static final int __GETPROGRESSUPDATE_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.GET_PROGRESS_UPDATE};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
     tmpMap.put(_Fields.OPERATION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("operationHandle", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TOperationHandle.class)));
+    tmpMap.put(_Fields.GET_PROGRESS_UPDATE, new org.apache.thrift.meta_data.FieldMetaData("getProgressUpdate", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusReq.class, metaDataMap);
   }
@@ -130,9 +140,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
    * Performs a deep copy on <i>other</i>.
    */
   public TGetOperationStatusReq(TGetOperationStatusReq other) {
+    __isset_bitfield = other.__isset_bitfield;
     if (other.isSetOperationHandle()) {
       this.operationHandle = new TOperationHandle(other.operationHandle);
     }
+    this.getProgressUpdate = other.getProgressUpdate;
   }
 
   public TGetOperationStatusReq deepCopy() {
@@ -142,6 +154,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
   @Override
   public void clear() {
     this.operationHandle = null;
+    setGetProgressUpdateIsSet(false);
+    this.getProgressUpdate = false;
   }
 
   public TOperationHandle getOperationHandle() {
@@ -167,6 +181,28 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
     }
   }
 
+  public boolean isGetProgressUpdate() {
+    return this.getProgressUpdate;
+  }
+
+  public void setGetProgressUpdate(boolean getProgressUpdate) {
+    this.getProgressUpdate = getProgressUpdate;
+    setGetProgressUpdateIsSet(true);
+  }
+
+  public void unsetGetProgressUpdate() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+  }
+
+  /** Returns true if field getProgressUpdate is set (has been assigned a value) and false otherwise */
+  public boolean isSetGetProgressUpdate() {
+    return EncodingUtils.testBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+  }
+
+  public void setGetProgressUpdateIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case OPERATION_HANDLE:
@@ -177,6 +213,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
       }
       break;
 
+    case GET_PROGRESS_UPDATE:
+      if (value == null) {
+        unsetGetProgressUpdate();
+      } else {
+        setGetProgressUpdate((Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -185,6 +229,9 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
     case OPERATION_HANDLE:
       return getOperationHandle();
 
+    case GET_PROGRESS_UPDATE:
+      return isGetProgressUpdate();
+
     }
     throw new IllegalStateException();
   }
@@ -198,6 +245,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
     switch (field) {
     case OPERATION_HANDLE:
       return isSetOperationHandle();
+    case GET_PROGRESS_UPDATE:
+      return isSetGetProgressUpdate();
     }
     throw new IllegalStateException();
   }
@@ -224,6 +273,15 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
         return false;
     }
 
+    boolean this_present_getProgressUpdate = true && this.isSetGetProgressUpdate();
+    boolean that_present_getProgressUpdate = true && that.isSetGetProgressUpdate();
+    if (this_present_getProgressUpdate || that_present_getProgressUpdate) {
+      if (!(this_present_getProgressUpdate && that_present_getProgressUpdate))
+        return false;
+      if (this.getProgressUpdate != that.getProgressUpdate)
+        return false;
+    }
+
     return true;
   }
 
@@ -236,6 +294,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
     if (present_operationHandle)
       list.add(operationHandle);
 
+    boolean present_getProgressUpdate = true && (isSetGetProgressUpdate());
+    list.add(present_getProgressUpdate);
+    if (present_getProgressUpdate)
+      list.add(getProgressUpdate);
+
     return list.hashCode();
   }
 
@@ -257,6 +320,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetGetProgressUpdate()).compareTo(other.isSetGetProgressUpdate());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetGetProgressUpdate()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getProgressUpdate, other.getProgressUpdate);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -284,6 +357,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
       sb.append(this.operationHandle);
     }
     first = false;
+    if (isSetGetProgressUpdate()) {
+      if (!first) sb.append(", ");
+      sb.append("getProgressUpdate:");
+      sb.append(this.getProgressUpdate);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -310,6 +389,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);
@@ -343,6 +424,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 2: // GET_PROGRESS_UPDATE
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.getProgressUpdate = iprot.readBool();
+              struct.setGetProgressUpdateIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -361,6 +450,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
         struct.operationHandle.write(oprot);
         oprot.writeFieldEnd();
       }
+      if (struct.isSetGetProgressUpdate()) {
+        oprot.writeFieldBegin(GET_PROGRESS_UPDATE_FIELD_DESC);
+        oprot.writeBool(struct.getProgressUpdate);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -379,6 +473,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
     public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusReq struct) throws org.apache.thrift.TException {
       TTupleProtocol oprot = (TTupleProtocol) prot;
       struct.operationHandle.write(oprot);
+      BitSet optionals = new BitSet();
+      if (struct.isSetGetProgressUpdate()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetGetProgressUpdate()) {
+        oprot.writeBool(struct.getProgressUpdate);
+      }
     }
 
     @Override
@@ -387,6 +489,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
       struct.operationHandle = new TOperationHandle();
       struct.operationHandle.read(iprot);
       struct.setOperationHandleIsSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.getProgressUpdate = iprot.readBool();
+        struct.setGetProgressUpdateIsSet(true);
+      }
     }
   }