You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/07 20:59:35 UTC
[67/70] [abbrv] hive git commit: HIVE-15473: Progress Bar on Beeline
client (Anishek Agarwal via Thejas Nair)
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
new file mode 100644
index 0000000..0a28edd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class FSCountersSummary implements PrintSummary {
+
+ private static final String FORMATTING_PATTERN = "%10s %15s %13s %18s %18s %13s";
+ private static final String HEADER = String.format(FORMATTING_PATTERN,
+ "VERTICES", "BYTES_READ", "READ_OPS", "LARGE_READ_OPS", "BYTES_WRITTEN", "WRITE_OPS");
+
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+
+ FSCountersSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("FileSystem Counters Summary");
+
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ // Assuming FileSystem.getAllStatistics() returns all schemes that are accessed on task side
+ // as well. If not, we need a way to get all the schemes that are accessed by the tez task/llap.
+ for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) {
+ final String scheme = statistics.getScheme().toUpperCase();
+
+ console.printInfo("");
+ console.printInfo("Scheme: " + scheme);
+ console.printInfo(SEPARATOR);
+ console.printInfo(HEADER);
+ console.printInfo(SEPARATOR);
+
+ for (String vertexName : keys) {
+ TezCounters vertexCounters = vertexCounters(statusOptions, vertexName);
+ if (vertexCounters != null) {
+ console.printInfo(summary(scheme, vertexName, vertexCounters));
+ }
+ }
+
+ console.printInfo(SEPARATOR);
+ }
+ }
+
+ private String summary(String scheme, String vertexName, TezCounters vertexCounters) {
+ final String counterGroup = FileSystemCounter.class.getName();
+ final long bytesRead = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.BYTES_READ.name());
+ final long bytesWritten = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.BYTES_WRITTEN.name());
+ final long readOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.READ_OPS.name());
+ final long largeReadOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.LARGE_READ_OPS.name());
+ final long writeOps = getCounterValueByGroupName(vertexCounters,
+ counterGroup, scheme + "_" + FileSystemCounter.WRITE_OPS.name());
+
+ return String.format(FORMATTING_PATTERN,
+ vertexName,
+ Utilities.humanReadableByteCount(bytesRead),
+ readOps,
+ largeReadOps,
+ Utilities.humanReadableByteCount(bytesWritten),
+ writeOps);
+ }
+
+ private TezCounters vertexCounters(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
new file mode 100644
index 0000000..81f1755
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
@@ -0,0 +1,108 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor.getCounterValueByGroupName;
+
+public class LLAPioSummary implements PrintSummary {
+
+ private static final String LLAP_SUMMARY_HEADER_FORMAT = "%10s %9s %9s %10s %9s %10s %11s %8s %9s";
+ private static final String LLAP_IO_SUMMARY_HEADER = "LLAP IO Summary";
+ private static final String LLAP_SUMMARY_HEADER = String.format(LLAP_SUMMARY_HEADER_FORMAT,
+ "VERTICES", "ROWGROUPS", "META_HIT", "META_MISS", "DATA_HIT", "DATA_MISS",
+ "ALLOCATION", "USED", "TOTAL_IO");
+
+
+
+ private final DecimalFormat secondsFormatter = new DecimalFormat("#0.00");
+ private Map<String, Progress> progressMap;
+ private DAGClient dagClient;
+ private boolean first = false;
+
+ LLAPioSummary(Map<String, Progress> progressMap, DAGClient dagClient) {
+ this.progressMap = progressMap;
+ this.dagClient = dagClient;
+ }
+
+ @Override
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("");
+ console.printInfo(LLAP_IO_SUMMARY_HEADER);
+
+ SortedSet<String> keys = new TreeSet<>(progressMap.keySet());
+ Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+ statusOptions.add(StatusGetOpts.GET_COUNTERS);
+ String counterGroup = LlapIOCounters.class.getName();
+ for (String vertexName : keys) {
+ // Reducers do not benefit from LLAP IO so no point in printing
+ if (vertexName.startsWith("Reducer")) {
+ continue;
+ }
+ TezCounters vertexCounters = vertexCounter(statusOptions, vertexName);
+ if (vertexCounters != null) {
+ if (!first) {
+ console.printInfo(SEPARATOR);
+ console.printInfo(LLAP_SUMMARY_HEADER);
+ console.printInfo(SEPARATOR);
+ first = true;
+ }
+ console.printInfo(vertexSummary(vertexName, counterGroup, vertexCounters));
+ }
+ }
+ console.printInfo(SEPARATOR);
+ console.printInfo("");
+ }
+
+ private String vertexSummary(String vertexName, String counterGroup, TezCounters vertexCounters) {
+ final long selectedRowgroups = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.SELECTED_ROWGROUPS.name());
+ final long metadataCacheHit = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.METADATA_CACHE_HIT.name());
+ final long metadataCacheMiss = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.METADATA_CACHE_MISS.name());
+ final long cacheHitBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.CACHE_HIT_BYTES.name());
+ final long cacheMissBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.CACHE_MISS_BYTES.name());
+ final long allocatedBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.ALLOCATED_BYTES.name());
+ final long allocatedUsedBytes = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.ALLOCATED_USED_BYTES.name());
+ final long totalIoTime = getCounterValueByGroupName(vertexCounters,
+ counterGroup, LlapIOCounters.TOTAL_IO_TIME_NS.name());
+
+
+ return String.format(LLAP_SUMMARY_HEADER_FORMAT,
+ vertexName,
+ selectedRowgroups,
+ metadataCacheHit,
+ metadataCacheMiss,
+ Utilities.humanReadableByteCount(cacheHitBytes),
+ Utilities.humanReadableByteCount(cacheMissBytes),
+ Utilities.humanReadableByteCount(allocatedBytes),
+ Utilities.humanReadableByteCount(allocatedUsedBytes),
+ secondsFormatter.format(totalIoTime / 1000_000_000.0) + "s");
+ }
+
+ private TezCounters vertexCounter(Set<StatusGetOpts> statusOptions, String vertexName) {
+ try {
+ return dagClient.getVertexStatus(vertexName, statusOptions).getVertexCounters();
+ } catch (IOException | TezException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
new file mode 100644
index 0000000..6311335
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/PrintSummary.java
@@ -0,0 +1,7 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+interface PrintSummary {
+ void print(SessionState.LogHelper console);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
new file mode 100644
index 0000000..1625ce1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/QueryExecutionBreakdownSummary.java
@@ -0,0 +1,75 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.text.DecimalFormat;
+
+import static org.apache.hadoop.hive.ql.exec.tez.monitoring.Constants.SEPARATOR;
+
+class QueryExecutionBreakdownSummary implements PrintSummary {
+ // Methods summary
+ private static final String OPERATION_SUMMARY = "%-35s %9s";
+ private static final String OPERATION = "OPERATION";
+ private static final String DURATION = "DURATION";
+
+
+ private DecimalFormat decimalFormat = new DecimalFormat("#0.00");
+ private PerfLogger perfLogger;
+
+ private final Long compileEndTime;
+ private final Long dagSubmitStartTime;
+ private final Long submitToRunningDuration;
+
+ QueryExecutionBreakdownSummary(PerfLogger perfLogger) {
+ this.perfLogger = perfLogger;
+ this.compileEndTime = perfLogger.getEndTime(PerfLogger.COMPILE);
+ this.dagSubmitStartTime = perfLogger.getStartTime(PerfLogger.TEZ_SUBMIT_DAG);
+ this.submitToRunningDuration = perfLogger.getDuration(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ }
+
+ private String formatNumber(long number) {
+ return decimalFormat.format(number / 1000.0) + "s";
+ }
+
+ private String format(String value, long number) {
+ return String.format(OPERATION_SUMMARY, value, formatNumber(number));
+ }
+
+ public void print(SessionState.LogHelper console) {
+ console.printInfo("Query Execution Summary");
+
+ String execBreakdownHeader = String.format(OPERATION_SUMMARY, OPERATION, DURATION);
+ console.printInfo(SEPARATOR);
+ console.printInfo(execBreakdownHeader);
+ console.printInfo(SEPARATOR);
+
+ // parse, analyze, optimize and compile
+ long compile = compileEndTime - perfLogger.getStartTime(PerfLogger.COMPILE);
+ console.printInfo(format("Compile Query", compile));
+
+ // prepare plan for submission (building DAG, adding resources, creating scratch dirs etc.)
+ long totalDAGPrep = dagSubmitStartTime - compileEndTime;
+ console.printInfo(format("Prepare Plan", totalDAGPrep));
+
+ // submit to accept dag (if session is closed, this will include re-opening of session time,
+ // localizing files for AM, submitting DAG)
+ long submitToAccept = perfLogger.getStartTime(PerfLogger.TEZ_RUN_DAG) - dagSubmitStartTime;
+ console.printInfo(format("Submit Plan", submitToAccept));
+
+ // accept to start dag (schedule wait time, resource wait time etc.)
+ console.printInfo(format("Start DAG", submitToRunningDuration));
+
+ // time to actually run the dag (actual dag runtime)
+ final long startToEnd;
+ if (submitToRunningDuration == 0) {
+ startToEnd = perfLogger.getDuration(PerfLogger.TEZ_RUN_DAG);
+ } else {
+ startToEnd = perfLogger.getEndTime(PerfLogger.TEZ_RUN_DAG) -
+ perfLogger.getEndTime(PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ }
+ console.printInfo(format("Run DAG", startToEnd));
+ console.printInfo(SEPARATOR);
+ console.printInfo("");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
new file mode 100644
index 0000000..1e54f6e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java
@@ -0,0 +1,397 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ <p>
+ http://www.apache.org/licenses/LICENSE-2.0
+ <p>
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.StringWriter;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING;
+
+/**
+ * TezJobMonitor keeps track of a tez job while it's being executed. It will
+ * print status to the console and retrieve final status of the job after
+ * completion.
+ */
+public class TezJobMonitor {
+
+ private static final String CLASS_NAME = TezJobMonitor.class.getName();
+ private static final int CHECK_INTERVAL = 200;
+ private static final int MAX_RETRY_INTERVAL = 2500;
+ private static final int PRINT_INTERVAL = 3000;
+
+ private final PerfLogger perfLogger = SessionState.getPerfLogger();
+ private static final List<DAGClient> shutdownList;
+ private final Map<String, BaseWork> workMap;
+
+ private transient LogHelper console;
+
+ private long lastPrintTime;
+ private StringWriter diagnostics = new StringWriter();
+
+ interface UpdateFunction {
+ void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report);
+ }
+
+ static {
+ shutdownList = new LinkedList<>();
+ ShutdownHookManager.addShutdownHook(new Runnable() {
+ @Override
+ public void run() {
+ TezJobMonitor.killRunningJobs();
+ try {
+ TezSessionPoolManager.getInstance().closeNonDefaultSessions(false);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ }
+
+ public static void initShutdownHook() {
+ Preconditions.checkNotNull(shutdownList,
+ "Shutdown hook was not properly initialized");
+ }
+
+ private final DAGClient dagClient;
+ private final HiveConf hiveConf;
+ private final DAG dag;
+ private final Context context;
+ private long executionStartTime = 0;
+ private final UpdateFunction updateFunction;
+ /**
+ * Have to use the same instance to render else the number lines printed earlier is lost and the
+ * screen will print the table again and again.
+ */
+ private final InPlaceUpdate inPlaceUpdate;
+
+ public TezJobMonitor(Map<String, BaseWork> workMap, final DAGClient dagClient, HiveConf conf, DAG dag,
+ Context ctx) {
+ this.workMap = workMap;
+ this.dagClient = dagClient;
+ this.hiveConf = conf;
+ this.dag = dag;
+ this.context = ctx;
+ console = SessionState.getConsole();
+ inPlaceUpdate = new InPlaceUpdate(LogHelper.getInfoStream());
+ updateFunction = updateFunction();
+ }
+
+ private UpdateFunction updateFunction() {
+ UpdateFunction logToFileFunction = new UpdateFunction() {
+ @Override
+ public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+ SessionState.get().updateProgressMonitor(progressMonitor(status, vertexProgressMap));
+ console.printInfo(report);
+ }
+ };
+ UpdateFunction inPlaceUpdateFunction = new UpdateFunction() {
+ @Override
+ public void update(DAGStatus status, Map<String, Progress> vertexProgressMap, String report) {
+ inPlaceUpdate.render(progressMonitor(status, vertexProgressMap));
+ console.logInfo(report);
+ }
+ };
+ return InPlaceUpdate.canRenderInPlace(hiveConf)
+ && !SessionState.getConsole().getIsSilent()
+ && !SessionState.get().isHiveServerQuery()
+ ? inPlaceUpdateFunction : logToFileFunction;
+ }
+
+ private boolean isProfilingEnabled() {
+ return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
+ Utilities.isPerfOrAboveLogging(hiveConf);
+ }
+
+ public int monitorExecution() {
+ boolean done = false;
+ boolean success = false;
+ int failedCounter = 0;
+ int rc = 0;
+ DAGStatus status = null;
+ Map<String, Progress> vertexProgressMap = null;
+
+
+ long monitorStartTime = System.currentTimeMillis();
+ synchronized (shutdownList) {
+ shutdownList.add(dagClient);
+ }
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ DAGStatus.State lastState = null;
+ String lastReport = null;
+ boolean running = false;
+
+ while (true) {
+
+ try {
+ if (context != null) {
+ context.checkHeartbeaterLockException();
+ }
+
+ status = dagClient.getDAGStatus(new HashSet<StatusGetOpts>(), CHECK_INTERVAL);
+ vertexProgressMap = status.getVertexProgress();
+ DAGStatus.State state = status.getState();
+
+ if (state != lastState || state == RUNNING) {
+ lastState = state;
+
+ switch (state) {
+ case SUBMITTED:
+ console.printInfo("Status: Submitted");
+ break;
+ case INITING:
+ console.printInfo("Status: Initializing");
+ this.executionStartTime = System.currentTimeMillis();
+ break;
+ case RUNNING:
+ if (!running) {
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_TO_RUNNING);
+ console.printInfo("Status: Running (" + dagClient.getExecutionContext() + ")\n");
+ this.executionStartTime = System.currentTimeMillis();
+ running = true;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ break;
+ case SUCCEEDED:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ success = true;
+ running = false;
+ done = true;
+ break;
+ case KILLED:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ console.printInfo("Status: Killed");
+ running = false;
+ done = true;
+ rc = 1;
+ break;
+ case FAILED:
+ case ERROR:
+ if (!running) {
+ this.executionStartTime = monitorStartTime;
+ }
+ lastReport = updateStatus(status, vertexProgressMap, lastReport);
+ console.printError("Status: Failed");
+ running = false;
+ done = true;
+ rc = 2;
+ break;
+ }
+ }
+ } catch (Exception e) {
+ console.printInfo("Exception: " + e.getMessage());
+ boolean isInterrupted = hasInterruptedException(e);
+ if (isInterrupted || (++failedCounter % MAX_RETRY_INTERVAL / CHECK_INTERVAL == 0)) {
+ try {
+ console.printInfo("Killing DAG...");
+ dagClient.tryKillDAG();
+ } catch (IOException | TezException tezException) {
+ // best effort
+ }
+ console
+ .printError("Execution has failed. stack trace: " + ExceptionUtils.getStackTrace(e));
+ rc = 1;
+ done = true;
+ } else {
+ console.printInfo("Retrying...");
+ }
+ } finally {
+ if (done) {
+ if (rc != 0 && status != null) {
+ for (String diag : status.getDiagnostics()) {
+ console.printError(diag);
+ diagnostics.append(diag);
+ }
+ }
+ synchronized (shutdownList) {
+ shutdownList.remove(dagClient);
+ }
+ break;
+ }
+ }
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG);
+ printSummary(success, vertexProgressMap);
+ return rc;
+ }
+
+ private void printSummary(boolean success, Map<String, Progress> progressMap) {
+ if (isProfilingEnabled() && success && progressMap != null) {
+
+ double duration = (System.currentTimeMillis() - this.executionStartTime) / 1000.0;
+ console.printInfo("Status: DAG finished successfully in " + String.format("%.2f seconds", duration));
+ console.printInfo("");
+
+ new QueryExecutionBreakdownSummary(perfLogger).print(console);
+ new DAGSummary(progressMap, hiveConf, dagClient, dag, perfLogger).print(console);
+
+ //llap IO summary
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.LLAP_IO_ENABLED, false)) {
+ new LLAPioSummary(progressMap, dagClient).print(console);
+ new FSCountersSummary(progressMap, dagClient).print(console);
+ }
+ console.printInfo("");
+ }
+ }
+
+ private static boolean hasInterruptedException(Throwable e) {
+ // Hadoop IPC wraps InterruptedException. GRRR.
+ while (e != null) {
+ if (e instanceof InterruptedException || e instanceof InterruptedIOException) {
+ return true;
+ }
+ e = e.getCause();
+ }
+ return false;
+ }
+
+ /**
+ * killRunningJobs tries to terminate execution of all
+ * currently running tez queries. No guarantees, best effort only.
+ */
+ private static void killRunningJobs() {
+ synchronized (shutdownList) {
+ for (DAGClient c : shutdownList) {
+ try {
+ System.err.println("Trying to shutdown DAG");
+ c.tryKillDAG();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern,
+ String counterName) {
+ TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName);
+ return (tezCounter == null) ? 0 : tezCounter.getValue();
+ }
+
+ private String updateStatus(DAGStatus status, Map<String, Progress> vertexProgressMap,
+ String lastReport) {
+ String report = getReport(vertexProgressMap);
+ if (!report.equals(lastReport) || System.currentTimeMillis() >= lastPrintTime + PRINT_INTERVAL) {
+ updateFunction.update(status, vertexProgressMap, report);
+ lastPrintTime = System.currentTimeMillis();
+ }
+ return report;
+ }
+
+ private String getReport(Map<String, Progress> progressMap) {
+ StringBuilder reportBuffer = new StringBuilder();
+
+ SortedSet<String> keys = new TreeSet<String>(progressMap.keySet());
+ for (String s : keys) {
+ Progress progress = progressMap.get(s);
+ final int complete = progress.getSucceededTaskCount();
+ final int total = progress.getTotalTaskCount();
+ final int running = progress.getRunningTaskCount();
+ final int failed = progress.getFailedTaskAttemptCount();
+ if (total <= 0) {
+ reportBuffer.append(String.format("%s: -/-\t", s));
+ } else {
+ if (complete == total) {
+ /*
+ * We may have missed the start of the vertex due to the 3 seconds interval
+ */
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+ if (complete < total && (complete > 0 || running > 0 || failed > 0)) {
+
+ if (!perfLogger.startTimeHasMethod(PerfLogger.TEZ_RUN_VERTEX + s)) {
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_VERTEX + s);
+ }
+
+ /* vertex is started, but not complete */
+ if (failed > 0) {
+ reportBuffer.append(String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total));
+ }
+ } else {
+ /* vertex is waiting for input/slots or complete */
+ if (failed > 0) {
+ /* tasks finished but some failed */
+ reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total));
+ } else {
+ reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
+ }
+ }
+ }
+ }
+
+ return reportBuffer.toString();
+ }
+
+ public String getDiagnostics() {
+ return diagnostics.toString();
+ }
+
+ private ProgressMonitor progressMonitor(DAGStatus status, Map<String, Progress> progressMap) {
+ try {
+ return new TezProgressMonitor(dagClient, status, workMap, progressMap, console,
+ executionStartTime);
+ } catch (IOException | TezException e) {
+ console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " +
+ ExceptionUtils.getStackTrace(e));
+ }
+ return TezProgressMonitor.NULL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
new file mode 100644
index 0000000..3475fc2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java
@@ -0,0 +1,313 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.apache.tez.dag.api.client.DAGStatus.State.KILLED;
+
+class TezProgressMonitor implements ProgressMonitor {
+ private static final int COLUMN_1_WIDTH = 16;
+ private final Map<String, BaseWork> workMap;
+ private final SessionState.LogHelper console;
+ private final long executionStartTime;
+ private final DAGStatus status;
+ Map<String, VertexStatus> vertexStatusMap = new HashMap<>();
+ Map<String, VertexProgress> progressCountsMap = new HashMap<>();
+
+ /**
+ * Try to get most the data required from dagClient in the constructor itself so that even after
+ * the tez job has finished this object can be used for later use.s
+ */
+ TezProgressMonitor(DAGClient dagClient, DAGStatus status, Map<String, BaseWork> workMap,
+ Map<String, Progress> progressMap, SessionState.LogHelper console, long executionStartTime)
+ throws IOException, TezException {
+ this.status = status;
+ this.workMap = workMap;
+ this.console = console;
+ this.executionStartTime = executionStartTime;
+ for (Map.Entry<String, Progress> entry : progressMap.entrySet()) {
+ String vertexName = entry.getKey();
+ progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState()));
+ try {
+ vertexStatusMap.put(vertexName, dagClient.getVertexStatus(vertexName, null));
+ } catch (IOException e) {
+ // best attempt, shouldn't really kill DAG for this
+ }
+ }
+ }
+
+ public List<String> headers() {
+ return Arrays.asList(
+ "VERTICES",
+ "MODE",
+ "STATUS",
+ "TOTAL",
+ "COMPLETED",
+ "RUNNING",
+ "PENDING",
+ "FAILED",
+ "KILLED"
+ );
+ }
+
+ public List<List<String>> rows() {
+ try {
+ List<List<String>> results = new ArrayList<>();
+ SortedSet<String> keys = new TreeSet<>(progressCountsMap.keySet());
+ for (String s : keys) {
+ VertexProgress progress = progressCountsMap.get(s);
+
+ // Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
+
+ results.add(
+ Arrays.asList(
+ getNameWithProgress(s, progress.succeededTaskCount, progress.totalTaskCount),
+ getMode(s, workMap),
+ progress.vertexStatus(vertexStatusMap.get(s)),
+ progress.total(),
+ progress.completed(),
+ progress.running(),
+ progress.pending(),
+ progress.failed(),
+ progress.killed()
+ )
+ );
+ }
+ return results;
+ } catch (Exception e) {
+ console.printInfo(
+ "Getting Progress Bar table rows failed: " + e.getMessage() + " stack trace: " + Arrays
+ .toString(e.getStackTrace())
+ );
+ }
+ return Collections.emptyList();
+ }
+
+ // -------------------------------------------------------------------------------
+ // VERTICES: 03/04 [=================>>-----] 86% ELAPSED TIME: 1.71 s
+ // -------------------------------------------------------------------------------
+ // contains footerSummary , progressedPercentage, starTime
+
+ @Override
+ public String footerSummary() {
+ return String.format("VERTICES: %02d/%02d", completed(), progressCountsMap.keySet().size());
+ }
+
+ @Override
+ public long startTime() {
+ return executionStartTime;
+ }
+
+ @Override
+ public double progressedPercentage() {
+ int sumTotal = 0, sumComplete = 0;
+ for (String s : progressCountsMap.keySet()) {
+ VertexProgress progress = progressCountsMap.get(s);
+ final int complete = progress.succeededTaskCount;
+ final int total = progress.totalTaskCount;
+ if (total > 0) {
+ sumTotal += total;
+ sumComplete += complete;
+ }
+ }
+ return (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
+ }
+
+ @Override
+ public String executionStatus() {
+ return this.status.getState().name();
+ }
+
+ private int completed() {
+ Set<String> completed = new HashSet<>();
+ for (String s : progressCountsMap.keySet()) {
+ VertexProgress progress = progressCountsMap.get(s);
+ final int complete = progress.succeededTaskCount;
+ final int total = progress.totalTaskCount;
+ if (total > 0) {
+ if (complete == total) {
+ completed.add(s);
+ }
+ }
+ }
+ return completed.size();
+ }
+
+ // Map 1 ..........
+
+ private String getNameWithProgress(String s, int complete, int total) {
+ String result = "";
+ if (s != null) {
+ float percent = total == 0 ? 0.0f : (float) complete / (float) total;
+ // lets use the remaining space in column 1 as progress bar
+ int spaceRemaining = COLUMN_1_WIDTH - s.length() - 1;
+ String trimmedVName = s;
+
+ // if the vertex name is longer than column 1 width, trim it down
+ // "Tez Merge File Work" will become "Tez Merge File.."
+ if (s.length() > COLUMN_1_WIDTH) {
+ trimmedVName = s.substring(0, COLUMN_1_WIDTH - 1);
+ trimmedVName = trimmedVName + "..";
+ }
+
+ result = trimmedVName + " ";
+ int toFill = (int) (spaceRemaining * percent);
+ for (int i = 0; i < toFill; i++) {
+ result += ".";
+ }
+ }
+ return result;
+ }
+
+ private String getMode(String name, Map<String, BaseWork> workMap) {
+ String mode = "container";
+ BaseWork work = workMap.get(name);
+ if (work != null) {
+ // uber > llap > container
+ if (work.getUberMode()) {
+ mode = "uber";
+ } else if (work.getLlapMode()) {
+ mode = "llap";
+ } else {
+ mode = "container";
+ }
+ }
+ return mode;
+ }
+
+ static class VertexProgress {
+ private final int totalTaskCount;
+ private final int succeededTaskCount;
+ private final int failedTaskAttemptCount;
+ private final long killedTaskAttemptCount;
+ private final int runningTaskCount;
+ private final DAGStatus.State dagState;
+
+ VertexProgress(Progress progress, DAGStatus.State dagState) {
+ this(progress.getTotalTaskCount(), progress.getSucceededTaskCount(),
+ progress.getFailedTaskAttemptCount(), progress.getKilledTaskAttemptCount(),
+ progress.getRunningTaskCount(), dagState);
+ }
+
+ VertexProgress(int totalTaskCount, int succeededTaskCount, int failedTaskAttemptCount,
+ int killedTaskAttemptCount, int runningTaskCount, DAGStatus.State dagState) {
+ this.totalTaskCount = totalTaskCount;
+ this.succeededTaskCount = succeededTaskCount;
+ this.failedTaskAttemptCount = failedTaskAttemptCount;
+ this.killedTaskAttemptCount = killedTaskAttemptCount;
+ this.runningTaskCount = runningTaskCount;
+ this.dagState = dagState;
+ }
+
+ boolean isRunning() {
+ return succeededTaskCount < totalTaskCount && (succeededTaskCount > 0 || runningTaskCount > 0
+ || failedTaskAttemptCount > 0);
+ }
+
+ String vertexStatus(VertexStatus vertexStatus) {
+ // To get vertex status we can use DAGClient.getVertexStatus(), but it will be expensive to
+ // get status from AM for every refresh of the UI. Lets infer the state from task counts.
+ // Only if DAG is FAILED or KILLED the vertex status is fetched from AM.
+ VertexStatus.State vertexState = VertexStatus.State.INITIALIZING;
+ if (totalTaskCount > 0) {
+ vertexState = VertexStatus.State.INITED;
+ }
+
+ // RUNNING state
+ if (isRunning()) {
+ vertexState = VertexStatus.State.RUNNING;
+ }
+
+ // SUCCEEDED state
+ if (succeededTaskCount == totalTaskCount) {
+ vertexState = VertexStatus.State.SUCCEEDED;
+ }
+
+ // DAG might have been killed, lets try to get vertex state from AM before dying
+ // KILLED or FAILED state
+ if (dagState == KILLED) {
+ if (vertexStatus != null) {
+ vertexState = vertexStatus.getState();
+ }
+ }
+ return vertexState.toString();
+ }
+
+ // "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED"
+
+ String total() {
+ return String.valueOf(totalTaskCount);
+ }
+
+ String completed() {
+ return String.valueOf(succeededTaskCount);
+ }
+
+ String running() {
+ return String.valueOf(runningTaskCount);
+ }
+
+ String pending() {
+ return String.valueOf(totalTaskCount - succeededTaskCount - runningTaskCount);
+ }
+
+ String failed() {
+ return String.valueOf(failedTaskAttemptCount);
+ }
+
+ String killed() {
+ return String.valueOf(killedTaskAttemptCount);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ VertexProgress that = (VertexProgress) o;
+
+ if (totalTaskCount != that.totalTaskCount)
+ return false;
+ if (succeededTaskCount != that.succeededTaskCount)
+ return false;
+ if (failedTaskAttemptCount != that.failedTaskAttemptCount)
+ return false;
+ if (killedTaskAttemptCount != that.killedTaskAttemptCount)
+ return false;
+ if (runningTaskCount != that.runningTaskCount)
+ return false;
+ return dagState == that.dagState;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = totalTaskCount;
+ result = 31 * result + succeededTaskCount;
+ result = 31 * result + failedTaskAttemptCount;
+ result = 31 * result + (int) (killedTaskAttemptCount ^ (killedTaskAttemptCount >>> 32));
+ result = 31 * result + runningTaskCount;
+ result = 31 * result + dagState.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c5b3517..ed854bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -26,8 +26,7 @@ import static org.apache.hadoop.hive.serde.serdeConstants.LINE_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.MAPKEY_DELIM;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@@ -127,11 +126,11 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionTask;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
-import org.apache.hadoop.hive.ql.exec.InPlaceUpdates;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.common.log.InPlaceUpdate;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
@@ -1898,7 +1897,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
final AtomicInteger partitionsLoaded = new AtomicInteger(0);
final boolean inPlaceEligible = conf.getLong("fs.trash.interval", 0) <= 0
- && InPlaceUpdates.inPlaceEligible(conf);
+ && InPlaceUpdate.canRenderInPlace(conf) && !SessionState.getConsole().getIsSilent();
final PrintStream ps = (inPlaceEligible) ? SessionState.getConsole().getInfoStream() : null;
final SessionState parentSession = SessionState.get();
@@ -1926,9 +1925,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (inPlaceEligible) {
synchronized (ps) {
- InPlaceUpdates.rePositionCursor(ps);
+ InPlaceUpdate.rePositionCursor(ps);
partitionsLoaded.incrementAndGet();
- InPlaceUpdates.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ InPlaceUpdate.reprintLine(ps, "Loaded : " + partitionsLoaded.get() + "/"
+ partsToLoad + " partitions.");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index d607f61..3e01e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.common.log.ProgressMonitor;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -185,6 +186,7 @@ public class SessionState {
private HiveAuthorizationProvider authorizer;
private HiveAuthorizer authorizerV2;
+ private volatile ProgressMonitor progressMonitor;
public enum AuthorizationMode{V1, V2};
@@ -1564,6 +1566,7 @@ public class SessionState {
// removes the threadlocal variables, closes underlying HMS connection
Hive.closeCurrent();
}
+ progressMonitor = null;
}
private void unCacheDataNucleusClassLoaders() {
@@ -1744,6 +1747,15 @@ public class SessionState {
public String getReloadableAuxJars() {
return StringUtils.join(preReloadableAuxJars, ',');
}
+
+ public void updateProgressMonitor(ProgressMonitor progressMonitor) {
+ this.progressMonitor = progressMonitor;
+ }
+
+ public ProgressMonitor getProgressMonitor() {
+ return progressMonitor;
+ }
+
}
class ResourceMaps {
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
new file mode 100644
index 0000000..648d625
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitor.java
@@ -0,0 +1,101 @@
+package org.apache.hadoop.hive.ql.exec.tez.monitoring;
+
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestTezProgressMonitor {
+
+ private static final String REDUCER = "Reducer";
+ private static final String MAPPER = "Mapper";
+ @Mock
+ private DAGClient dagClient;
+ @Mock
+ private SessionState.LogHelper console;
+ @Mock
+ private DAGStatus dagStatus;
+ @Mock
+ private Progress mapperProgress;
+ @Mock
+ private Progress reducerProgress;
+ @Mock
+ private VertexStatus succeeded;
+ @Mock
+ private VertexStatus running;
+
+ private Map<String, Progress> progressMap() {
+ return new HashMap<String, Progress>() {{
+ put(MAPPER, setup(mapperProgress, 2, 1, 3, 4, 5));
+ put(REDUCER, setup(reducerProgress, 3, 2, 1, 0, 1));
+ }};
+ }
+
+ private Progress setup(Progress progressMock, int total, int succeeded, int failedAttempt,
+ int killedAttempt, int running) {
+ when(progressMock.getTotalTaskCount()).thenReturn(total);
+ when(progressMock.getSucceededTaskCount()).thenReturn(succeeded);
+ when(progressMock.getFailedTaskAttemptCount()).thenReturn(failedAttempt);
+ when(progressMock.getKilledTaskAttemptCount()).thenReturn(killedAttempt);
+ when(progressMock.getRunningTaskCount()).thenReturn(running);
+ return progressMock;
+ }
+
+ @Test
+ public void setupInternalStateOnObjectCreation() throws IOException, TezException {
+ when(dagStatus.getState()).thenReturn(DAGStatus.State.RUNNING);
+ when(dagClient.getVertexStatus(eq(MAPPER), anySet())).thenReturn(succeeded);
+ when(dagClient.getVertexStatus(eq(REDUCER), anySet())).thenReturn(running);
+
+ TezProgressMonitor monitor =
+ new TezProgressMonitor(dagClient, dagStatus, new HashMap<String, BaseWork>(), progressMap(), console,
+ Long.MAX_VALUE);
+
+ verify(dagClient).getVertexStatus(eq(MAPPER), isNull(Set.class));
+ verify(dagClient).getVertexStatus(eq(REDUCER), isNull(Set.class));
+ verifyNoMoreInteractions(dagClient);
+
+ assertThat(monitor.vertexStatusMap.keySet(), hasItems(MAPPER, REDUCER));
+ assertThat(monitor.vertexStatusMap.get(MAPPER), is(sameInstance(succeeded)));
+ assertThat(monitor.vertexStatusMap.get(REDUCER), is(sameInstance(running)));
+
+ assertThat(monitor.progressCountsMap.keySet(), hasItems(MAPPER, REDUCER));
+ TezProgressMonitor.VertexProgress expectedMapperState =
+ new TezProgressMonitor.VertexProgress(2, 1, 3, 4, 5, DAGStatus.State.RUNNING);
+ assertThat(monitor.progressCountsMap.get(MAPPER), is(equalTo(expectedMapperState)));
+
+ TezProgressMonitor.VertexProgress expectedReducerState =
+ new TezProgressMonitor.VertexProgress(3, 2, 1, 0, 1, DAGStatus.State.RUNNING);
+ assertThat(monitor.progressCountsMap.get(REDUCER), is(equalTo(expectedReducerState)));
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/if/TCLIService.thrift
----------------------------------------------------------------------
diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift
index a4fa7b0..824b049 100644
--- a/service-rpc/if/TCLIService.thrift
+++ b/service-rpc/if/TCLIService.thrift
@@ -63,6 +63,9 @@ enum TProtocolVersion {
// V9 adds support for serializing ResultSets in SerDe
HIVE_CLI_SERVICE_PROTOCOL_V9
+
+ // V10 adds support for in place updates via GetOperationStatus
+ HIVE_CLI_SERVICE_PROTOCOL_V10
}
enum TTypeId {
@@ -559,7 +562,7 @@ struct TOperationHandle {
// which operations may be executed.
struct TOpenSessionReq {
// The version of the HiveServer2 protocol that the client is using.
- 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+ 1: required TProtocolVersion client_protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
// Username and password for authentication.
// Depending on the authentication scheme being used,
@@ -578,7 +581,7 @@ struct TOpenSessionResp {
1: required TStatus status
// The protocol version that the server is using.
- 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9
+ 2: required TProtocolVersion serverProtocolVersion = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10
// Session Handle
3: optional TSessionHandle sessionHandle
@@ -1019,6 +1022,8 @@ struct TGetCrossReferenceResp {
struct TGetOperationStatusReq {
// Session to run this request against
1: required TOperationHandle operationHandle
+ // optional arguments to get progress information
+ 2: optional bool getProgressUpdate
}
struct TGetOperationStatusResp {
@@ -1047,6 +1052,8 @@ struct TGetOperationStatusResp {
// If the operation has the result
9: optional bool hasResultSet
+ 10: optional TProgressUpdateResp progressUpdateResponse
+
}
@@ -1202,6 +1209,21 @@ struct TRenewDelegationTokenResp {
1: required TStatus status
}
+enum TJobExecutionStatus {
+ IN_PROGRESS,
+ COMPLETE,
+ NOT_AVAILABLE
+}
+
+struct TProgressUpdateResp {
+ 1: required list<string> headerNames
+ 2: required list<list<string>> rows
+ 3: required double progressedPercentage
+ 4: required TJobExecutionStatus status
+ 5: required string footerSummary
+ 6: required i64 startTime
+}
+
service TCLIService {
TOpenSessionResp OpenSession(1:TOpenSessionReq req);
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
index 2f460e8..0a17e0e 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp
@@ -269,6 +269,18 @@ const char* _kTFetchOrientationNames[] = {
};
const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTFetchOrientationValues, _kTFetchOrientationNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+int _kTJobExecutionStatusValues[] = {
+ TJobExecutionStatus::IN_PROGRESS,
+ TJobExecutionStatus::COMPLETE,
+ TJobExecutionStatus::NOT_AVAILABLE
+};
+const char* _kTJobExecutionStatusNames[] = {
+ "IN_PROGRESS",
+ "COMPLETE",
+ "NOT_AVAILABLE"
+};
+const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kTJobExecutionStatusValues, _kTJobExecutionStatusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+
TTypeQualifierValue::~TTypeQualifierValue() throw() {
}
@@ -8174,6 +8186,11 @@ void TGetOperationStatusReq::__set_operationHandle(const TOperationHandle& val)
this->operationHandle = val;
}
+void TGetOperationStatusReq::__set_getProgressUpdate(const bool val) {
+ this->getProgressUpdate = val;
+__isset.getProgressUpdate = true;
+}
+
uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8204,6 +8221,14 @@ uint32_t TGetOperationStatusReq::read(::apache::thrift::protocol::TProtocol* ipr
xfer += iprot->skip(ftype);
}
break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->getProgressUpdate);
+ this->__isset.getProgressUpdate = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -8227,6 +8252,11 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
xfer += this->operationHandle.write(oprot);
xfer += oprot->writeFieldEnd();
+ if (this->__isset.getProgressUpdate) {
+ xfer += oprot->writeFieldBegin("getProgressUpdate", ::apache::thrift::protocol::T_BOOL, 2);
+ xfer += oprot->writeBool(this->getProgressUpdate);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -8235,19 +8265,26 @@ uint32_t TGetOperationStatusReq::write(::apache::thrift::protocol::TProtocol* op
void swap(TGetOperationStatusReq &a, TGetOperationStatusReq &b) {
using ::std::swap;
swap(a.operationHandle, b.operationHandle);
+ swap(a.getProgressUpdate, b.getProgressUpdate);
+ swap(a.__isset, b.__isset);
}
TGetOperationStatusReq::TGetOperationStatusReq(const TGetOperationStatusReq& other268) {
operationHandle = other268.operationHandle;
+ getProgressUpdate = other268.getProgressUpdate;
+ __isset = other268.__isset;
}
TGetOperationStatusReq& TGetOperationStatusReq::operator=(const TGetOperationStatusReq& other269) {
operationHandle = other269.operationHandle;
+ getProgressUpdate = other269.getProgressUpdate;
+ __isset = other269.__isset;
return *this;
}
void TGetOperationStatusReq::printTo(std::ostream& out) const {
using ::apache::thrift::to_string;
out << "TGetOperationStatusReq(";
out << "operationHandle=" << to_string(operationHandle);
+ out << ", " << "getProgressUpdate="; (__isset.getProgressUpdate ? (out << to_string(getProgressUpdate)) : (out << "<null>"));
out << ")";
}
@@ -8300,6 +8337,11 @@ void TGetOperationStatusResp::__set_hasResultSet(const bool val) {
__isset.hasResultSet = true;
}
+void TGetOperationStatusResp::__set_progressUpdateResponse(const TProgressUpdateResp& val) {
+ this->progressUpdateResponse = val;
+__isset.progressUpdateResponse = true;
+}
+
uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) {
apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
@@ -8396,6 +8438,14 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip
xfer += iprot->skip(ftype);
}
break;
+ case 10:
+ if (ftype == ::apache::thrift::protocol::T_STRUCT) {
+ xfer += this->progressUpdateResponse.read(iprot);
+ this->__isset.progressUpdateResponse = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -8459,6 +8509,11 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o
xfer += oprot->writeBool(this->hasResultSet);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.progressUpdateResponse) {
+ xfer += oprot->writeFieldBegin("progressUpdateResponse", ::apache::thrift::protocol::T_STRUCT, 10);
+ xfer += this->progressUpdateResponse.write(oprot);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -8475,6 +8530,7 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) {
swap(a.operationStarted, b.operationStarted);
swap(a.operationCompleted, b.operationCompleted);
swap(a.hasResultSet, b.hasResultSet);
+ swap(a.progressUpdateResponse, b.progressUpdateResponse);
swap(a.__isset, b.__isset);
}
@@ -8488,6 +8544,7 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp&
operationStarted = other271.operationStarted;
operationCompleted = other271.operationCompleted;
hasResultSet = other271.hasResultSet;
+ progressUpdateResponse = other271.progressUpdateResponse;
__isset = other271.__isset;
}
TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other272) {
@@ -8500,6 +8557,7 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS
operationStarted = other272.operationStarted;
operationCompleted = other272.operationCompleted;
hasResultSet = other272.hasResultSet;
+ progressUpdateResponse = other272.progressUpdateResponse;
__isset = other272.__isset;
return *this;
}
@@ -8515,6 +8573,7 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const {
out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "<null>"));
out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "<null>"));
out << ", " << "hasResultSet="; (__isset.hasResultSet ? (out << to_string(hasResultSet)) : (out << "<null>"));
+ out << ", " << "progressUpdateResponse="; (__isset.progressUpdateResponse ? (out << to_string(progressUpdateResponse)) : (out << "<null>"));
out << ")";
}
@@ -9984,4 +10043,267 @@ void TRenewDelegationTokenResp::printTo(std::ostream& out) const {
out << ")";
}
+
+TProgressUpdateResp::~TProgressUpdateResp() throw() {
+}
+
+
+void TProgressUpdateResp::__set_headerNames(const std::vector<std::string> & val) {
+ this->headerNames = val;
+}
+
+void TProgressUpdateResp::__set_rows(const std::vector<std::vector<std::string> > & val) {
+ this->rows = val;
+}
+
+void TProgressUpdateResp::__set_progressedPercentage(const double val) {
+ this->progressedPercentage = val;
+}
+
+void TProgressUpdateResp::__set_status(const TJobExecutionStatus::type val) {
+ this->status = val;
+}
+
+void TProgressUpdateResp::__set_footerSummary(const std::string& val) {
+ this->footerSummary = val;
+}
+
+void TProgressUpdateResp::__set_startTime(const int64_t val) {
+ this->startTime = val;
+}
+
+uint32_t TProgressUpdateResp::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+ bool isset_headerNames = false;
+ bool isset_rows = false;
+ bool isset_progressedPercentage = false;
+ bool isset_status = false;
+ bool isset_footerSummary = false;
+ bool isset_startTime = false;
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->headerNames.clear();
+ uint32_t _size302;
+ ::apache::thrift::protocol::TType _etype305;
+ xfer += iprot->readListBegin(_etype305, _size302);
+ this->headerNames.resize(_size302);
+ uint32_t _i306;
+ for (_i306 = 0; _i306 < _size302; ++_i306)
+ {
+ xfer += iprot->readString(this->headerNames[_i306]);
+ }
+ xfer += iprot->readListEnd();
+ }
+ isset_headerNames = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 2:
+ if (ftype == ::apache::thrift::protocol::T_LIST) {
+ {
+ this->rows.clear();
+ uint32_t _size307;
+ ::apache::thrift::protocol::TType _etype310;
+ xfer += iprot->readListBegin(_etype310, _size307);
+ this->rows.resize(_size307);
+ uint32_t _i311;
+ for (_i311 = 0; _i311 < _size307; ++_i311)
+ {
+ {
+ this->rows[_i311].clear();
+ uint32_t _size312;
+ ::apache::thrift::protocol::TType _etype315;
+ xfer += iprot->readListBegin(_etype315, _size312);
+ this->rows[_i311].resize(_size312);
+ uint32_t _i316;
+ for (_i316 = 0; _i316 < _size312; ++_i316)
+ {
+ xfer += iprot->readString(this->rows[_i311][_i316]);
+ }
+ xfer += iprot->readListEnd();
+ }
+ }
+ xfer += iprot->readListEnd();
+ }
+ isset_rows = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 3:
+ if (ftype == ::apache::thrift::protocol::T_DOUBLE) {
+ xfer += iprot->readDouble(this->progressedPercentage);
+ isset_progressedPercentage = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_I32) {
+ int32_t ecast317;
+ xfer += iprot->readI32(ecast317);
+ this->status = (TJobExecutionStatus::type)ecast317;
+ isset_status = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 5:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->footerSummary);
+ isset_footerSummary = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->startTime);
+ isset_startTime = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ if (!isset_headerNames)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_rows)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_progressedPercentage)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_status)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_footerSummary)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_startTime)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ return xfer;
+}
+
+uint32_t TProgressUpdateResp::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("TProgressUpdateResp");
+
+ xfer += oprot->writeFieldBegin("headerNames", ::apache::thrift::protocol::T_LIST, 1);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>(this->headerNames.size()));
+ std::vector<std::string> ::const_iterator _iter318;
+ for (_iter318 = this->headerNames.begin(); _iter318 != this->headerNames.end(); ++_iter318)
+ {
+ xfer += oprot->writeString((*_iter318));
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("rows", ::apache::thrift::protocol::T_LIST, 2);
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_LIST, static_cast<uint32_t>(this->rows.size()));
+ std::vector<std::vector<std::string> > ::const_iterator _iter319;
+ for (_iter319 = this->rows.begin(); _iter319 != this->rows.end(); ++_iter319)
+ {
+ {
+ xfer += oprot->writeListBegin(::apache::thrift::protocol::T_STRING, static_cast<uint32_t>((*_iter319).size()));
+ std::vector<std::string> ::const_iterator _iter320;
+ for (_iter320 = (*_iter319).begin(); _iter320 != (*_iter319).end(); ++_iter320)
+ {
+ xfer += oprot->writeString((*_iter320));
+ }
+ xfer += oprot->writeListEnd();
+ }
+ }
+ xfer += oprot->writeListEnd();
+ }
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("progressedPercentage", ::apache::thrift::protocol::T_DOUBLE, 3);
+ xfer += oprot->writeDouble(this->progressedPercentage);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("status", ::apache::thrift::protocol::T_I32, 4);
+ xfer += oprot->writeI32((int32_t)this->status);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("footerSummary", ::apache::thrift::protocol::T_STRING, 5);
+ xfer += oprot->writeString(this->footerSummary);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldBegin("startTime", ::apache::thrift::protocol::T_I64, 6);
+ xfer += oprot->writeI64(this->startTime);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b) {
+ using ::std::swap;
+ swap(a.headerNames, b.headerNames);
+ swap(a.rows, b.rows);
+ swap(a.progressedPercentage, b.progressedPercentage);
+ swap(a.status, b.status);
+ swap(a.footerSummary, b.footerSummary);
+ swap(a.startTime, b.startTime);
+}
+
+TProgressUpdateResp::TProgressUpdateResp(const TProgressUpdateResp& other321) {
+ headerNames = other321.headerNames;
+ rows = other321.rows;
+ progressedPercentage = other321.progressedPercentage;
+ status = other321.status;
+ footerSummary = other321.footerSummary;
+ startTime = other321.startTime;
+}
+TProgressUpdateResp& TProgressUpdateResp::operator=(const TProgressUpdateResp& other322) {
+ headerNames = other322.headerNames;
+ rows = other322.rows;
+ progressedPercentage = other322.progressedPercentage;
+ status = other322.status;
+ footerSummary = other322.footerSummary;
+ startTime = other322.startTime;
+ return *this;
+}
+void TProgressUpdateResp::printTo(std::ostream& out) const {
+ using ::apache::thrift::to_string;
+ out << "TProgressUpdateResp(";
+ out << "headerNames=" << to_string(headerNames);
+ out << ", " << "rows=" << to_string(rows);
+ out << ", " << "progressedPercentage=" << to_string(progressedPercentage);
+ out << ", " << "status=" << to_string(status);
+ out << ", " << "footerSummary=" << to_string(footerSummary);
+ out << ", " << "startTime=" << to_string(startTime);
+ out << ")";
+}
+
}}}}} // namespace
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
index b249544..6c2bb34 100644
--- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
+++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h
@@ -175,6 +175,16 @@ struct TFetchOrientation {
extern const std::map<int, const char*> _TFetchOrientation_VALUES_TO_NAMES;
+struct TJobExecutionStatus {
+ enum type {
+ IN_PROGRESS = 0,
+ COMPLETE = 1,
+ NOT_AVAILABLE = 2
+ };
+};
+
+extern const std::map<int, const char*> _TJobExecutionStatus_VALUES_TO_NAMES;
+
typedef int32_t TTypeEntryPtr;
typedef std::string TIdentifier;
@@ -339,6 +349,8 @@ class TRenewDelegationTokenReq;
class TRenewDelegationTokenResp;
+class TProgressUpdateResp;
+
typedef struct _TTypeQualifierValue__isset {
_TTypeQualifierValue__isset() : i32Value(false), stringValue(false) {}
bool i32Value :1;
@@ -3669,24 +3681,37 @@ inline std::ostream& operator<<(std::ostream& out, const TGetCrossReferenceResp&
return out;
}
+typedef struct _TGetOperationStatusReq__isset {
+ _TGetOperationStatusReq__isset() : getProgressUpdate(false) {}
+ bool getProgressUpdate :1;
+} _TGetOperationStatusReq__isset;
class TGetOperationStatusReq {
public:
TGetOperationStatusReq(const TGetOperationStatusReq&);
TGetOperationStatusReq& operator=(const TGetOperationStatusReq&);
- TGetOperationStatusReq() {
+ TGetOperationStatusReq() : getProgressUpdate(0) {
}
virtual ~TGetOperationStatusReq() throw();
TOperationHandle operationHandle;
+ bool getProgressUpdate;
+
+ _TGetOperationStatusReq__isset __isset;
void __set_operationHandle(const TOperationHandle& val);
+ void __set_getProgressUpdate(const bool val);
+
bool operator == (const TGetOperationStatusReq & rhs) const
{
if (!(operationHandle == rhs.operationHandle))
return false;
+ if (__isset.getProgressUpdate != rhs.__isset.getProgressUpdate)
+ return false;
+ else if (__isset.getProgressUpdate && !(getProgressUpdate == rhs.getProgressUpdate))
+ return false;
return true;
}
bool operator != (const TGetOperationStatusReq &rhs) const {
@@ -3710,7 +3735,7 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq&
}
typedef struct _TGetOperationStatusResp__isset {
- _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false) {}
+ _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false), hasResultSet(false), progressUpdateResponse(false) {}
bool operationState :1;
bool sqlState :1;
bool errorCode :1;
@@ -3719,6 +3744,7 @@ typedef struct _TGetOperationStatusResp__isset {
bool operationStarted :1;
bool operationCompleted :1;
bool hasResultSet :1;
+ bool progressUpdateResponse :1;
} _TGetOperationStatusResp__isset;
class TGetOperationStatusResp {
@@ -3739,6 +3765,7 @@ class TGetOperationStatusResp {
int64_t operationStarted;
int64_t operationCompleted;
bool hasResultSet;
+ TProgressUpdateResp progressUpdateResponse;
_TGetOperationStatusResp__isset __isset;
@@ -3760,6 +3787,8 @@ class TGetOperationStatusResp {
void __set_hasResultSet(const bool val);
+ void __set_progressUpdateResponse(const TProgressUpdateResp& val);
+
bool operator == (const TGetOperationStatusResp & rhs) const
{
if (!(status == rhs.status))
@@ -3796,6 +3825,10 @@ class TGetOperationStatusResp {
return false;
else if (__isset.hasResultSet && !(hasResultSet == rhs.hasResultSet))
return false;
+ if (__isset.progressUpdateResponse != rhs.__isset.progressUpdateResponse)
+ return false;
+ else if (__isset.progressUpdateResponse && !(progressUpdateResponse == rhs.progressUpdateResponse))
+ return false;
return true;
}
bool operator != (const TGetOperationStatusResp &rhs) const {
@@ -4470,6 +4503,71 @@ inline std::ostream& operator<<(std::ostream& out, const TRenewDelegationTokenRe
return out;
}
+
+class TProgressUpdateResp {
+ public:
+
+ TProgressUpdateResp(const TProgressUpdateResp&);
+ TProgressUpdateResp& operator=(const TProgressUpdateResp&);
+ TProgressUpdateResp() : progressedPercentage(0), status((TJobExecutionStatus::type)0), footerSummary(), startTime(0) {
+ }
+
+ virtual ~TProgressUpdateResp() throw();
+ std::vector<std::string> headerNames;
+ std::vector<std::vector<std::string> > rows;
+ double progressedPercentage;
+ TJobExecutionStatus::type status;
+ std::string footerSummary;
+ int64_t startTime;
+
+ void __set_headerNames(const std::vector<std::string> & val);
+
+ void __set_rows(const std::vector<std::vector<std::string> > & val);
+
+ void __set_progressedPercentage(const double val);
+
+ void __set_status(const TJobExecutionStatus::type val);
+
+ void __set_footerSummary(const std::string& val);
+
+ void __set_startTime(const int64_t val);
+
+ bool operator == (const TProgressUpdateResp & rhs) const
+ {
+ if (!(headerNames == rhs.headerNames))
+ return false;
+ if (!(rows == rhs.rows))
+ return false;
+ if (!(progressedPercentage == rhs.progressedPercentage))
+ return false;
+ if (!(status == rhs.status))
+ return false;
+ if (!(footerSummary == rhs.footerSummary))
+ return false;
+ if (!(startTime == rhs.startTime))
+ return false;
+ return true;
+ }
+ bool operator != (const TProgressUpdateResp &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const TProgressUpdateResp & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+ virtual void printTo(std::ostream& out) const;
+};
+
+void swap(TProgressUpdateResp &a, TProgressUpdateResp &b);
+
+inline std::ostream& operator<<(std::ostream& out, const TProgressUpdateResp& obj)
+{
+ obj.printTo(out);
+ return out;
+}
+
}}}}} // namespace
#endif
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
----------------------------------------------------------------------
diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
index 84c64cd..af31ce2 100644
--- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
+++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusReq.java
@@ -39,6 +39,7 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TGetOperationStatusReq");
private static final org.apache.thrift.protocol.TField OPERATION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField GET_PROGRESS_UPDATE_FIELD_DESC = new org.apache.thrift.protocol.TField("getProgressUpdate", org.apache.thrift.protocol.TType.BOOL, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -47,10 +48,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
private TOperationHandle operationHandle; // required
+ private boolean getProgressUpdate; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- OPERATION_HANDLE((short)1, "operationHandle");
+ OPERATION_HANDLE((short)1, "operationHandle"),
+ GET_PROGRESS_UPDATE((short)2, "getProgressUpdate");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -67,6 +70,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
switch(fieldId) {
case 1: // OPERATION_HANDLE
return OPERATION_HANDLE;
+ case 2: // GET_PROGRESS_UPDATE
+ return GET_PROGRESS_UPDATE;
default:
return null;
}
@@ -107,11 +112,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
// isset id assignments
+ private static final int __GETPROGRESSUPDATE_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.GET_PROGRESS_UPDATE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.OPERATION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("operationHandle", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TOperationHandle.class)));
+ tmpMap.put(_Fields.GET_PROGRESS_UPDATE, new org.apache.thrift.meta_data.FieldMetaData("getProgressUpdate", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusReq.class, metaDataMap);
}
@@ -130,9 +140,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
* Performs a deep copy on <i>other</i>.
*/
public TGetOperationStatusReq(TGetOperationStatusReq other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetOperationHandle()) {
this.operationHandle = new TOperationHandle(other.operationHandle);
}
+ this.getProgressUpdate = other.getProgressUpdate;
}
public TGetOperationStatusReq deepCopy() {
@@ -142,6 +154,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
@Override
public void clear() {
this.operationHandle = null;
+ setGetProgressUpdateIsSet(false);
+ this.getProgressUpdate = false;
}
public TOperationHandle getOperationHandle() {
@@ -167,6 +181,28 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
}
+ public boolean isGetProgressUpdate() {
+ return this.getProgressUpdate;
+ }
+
+ public void setGetProgressUpdate(boolean getProgressUpdate) {
+ this.getProgressUpdate = getProgressUpdate;
+ setGetProgressUpdateIsSet(true);
+ }
+
+ public void unsetGetProgressUpdate() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+ }
+
+ /** Returns true if field getProgressUpdate is set (has been assigned a value) and false otherwise */
+ public boolean isSetGetProgressUpdate() {
+ return EncodingUtils.testBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID);
+ }
+
+ public void setGetProgressUpdateIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __GETPROGRESSUPDATE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case OPERATION_HANDLE:
@@ -177,6 +213,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
}
break;
+ case GET_PROGRESS_UPDATE:
+ if (value == null) {
+ unsetGetProgressUpdate();
+ } else {
+ setGetProgressUpdate((Boolean)value);
+ }
+ break;
+
}
}
@@ -185,6 +229,9 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
case OPERATION_HANDLE:
return getOperationHandle();
+ case GET_PROGRESS_UPDATE:
+ return isGetProgressUpdate();
+
}
throw new IllegalStateException();
}
@@ -198,6 +245,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
switch (field) {
case OPERATION_HANDLE:
return isSetOperationHandle();
+ case GET_PROGRESS_UPDATE:
+ return isSetGetProgressUpdate();
}
throw new IllegalStateException();
}
@@ -224,6 +273,15 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
return false;
}
+ boolean this_present_getProgressUpdate = true && this.isSetGetProgressUpdate();
+ boolean that_present_getProgressUpdate = true && that.isSetGetProgressUpdate();
+ if (this_present_getProgressUpdate || that_present_getProgressUpdate) {
+ if (!(this_present_getProgressUpdate && that_present_getProgressUpdate))
+ return false;
+ if (this.getProgressUpdate != that.getProgressUpdate)
+ return false;
+ }
+
return true;
}
@@ -236,6 +294,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
if (present_operationHandle)
list.add(operationHandle);
+ boolean present_getProgressUpdate = true && (isSetGetProgressUpdate());
+ list.add(present_getProgressUpdate);
+ if (present_getProgressUpdate)
+ list.add(getProgressUpdate);
+
return list.hashCode();
}
@@ -257,6 +320,16 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetGetProgressUpdate()).compareTo(other.isSetGetProgressUpdate());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGetProgressUpdate()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.getProgressUpdate, other.getProgressUpdate);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -284,6 +357,12 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
sb.append(this.operationHandle);
}
first = false;
+ if (isSetGetProgressUpdate()) {
+ if (!first) sb.append(", ");
+ sb.append("getProgressUpdate:");
+ sb.append(this.getProgressUpdate);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -310,6 +389,8 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -343,6 +424,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 2: // GET_PROGRESS_UPDATE
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.getProgressUpdate = iprot.readBool();
+ struct.setGetProgressUpdateIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -361,6 +450,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
struct.operationHandle.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.isSetGetProgressUpdate()) {
+ oprot.writeFieldBegin(GET_PROGRESS_UPDATE_FIELD_DESC);
+ oprot.writeBool(struct.getProgressUpdate);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -379,6 +473,14 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusReq struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
struct.operationHandle.write(oprot);
+ BitSet optionals = new BitSet();
+ if (struct.isSetGetProgressUpdate()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetGetProgressUpdate()) {
+ oprot.writeBool(struct.getProgressUpdate);
+ }
}
@Override
@@ -387,6 +489,11 @@ public class TGetOperationStatusReq implements org.apache.thrift.TBase<TGetOpera
struct.operationHandle = new TOperationHandle();
struct.operationHandle.read(iprot);
struct.setOperationHandleIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.getProgressUpdate = iprot.readBool();
+ struct.setGetProgressUpdateIsSet(true);
+ }
}
}