You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mg...@apache.org on 2020/02/09 21:55:32 UTC
[hive] branch master updated: HIVE-22835 Extract Executor from
Driver (Miklos Gergely, reviewed by Zoltan Haindrich)
This is an automated email from the ASF dual-hosted git repository.
mgergely pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0d9deba HIVE-22835 Extract Executor from Driver (Miklos Gergely, reviewed by Zoltan Haindrich)
0d9deba is described below
commit 0d9deba3c15038df4c64ea9b8494d554eb8eea2f
Author: miklosgergely <mg...@cloudera.com>
AuthorDate: Wed Feb 5 18:55:55 2020 +0100
HIVE-22835 Extract Executor from Driver (Miklos Gergely, reviewed by Zoltan Haindrich)
---
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 578 +-------------------
.../org/apache/hadoop/hive/ql/DriverContext.java | 23 +
.../java/org/apache/hadoop/hive/ql/Executor.java | 593 +++++++++++++++++++++
ql/src/java/org/apache/hadoop/hive/ql/IDriver.java | 2 +-
.../apache/hadoop/hive/ql/reexec/ReExecDriver.java | 4 +-
5 files changed, 635 insertions(+), 565 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 5191800..1f8bc12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -18,14 +18,10 @@
package org.apache.hadoop.hive.ql;
-import java.io.DataInput;
import java.io.IOException;
-import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -53,26 +49,18 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
-import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DagUtils;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.TaskResult;
-import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.hooks.Entity;
-import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lock.CompileLock;
import org.apache.hadoop.hive.ql.lock.CompileLockFactory;
@@ -82,7 +70,6 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
@@ -102,7 +89,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.common.util.TxnIdUtils;
@@ -111,7 +97,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
public class Driver implements IDriver {
@@ -124,15 +109,11 @@ public class Driver implements IDriver {
private int maxRows = 100;
private ByteStream.Output bos = new ByteStream.Output();
- private DataInput resStream;
private Context context;
private final DriverContext driverContext;
private TaskQueue taskQueue;
private final List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
- // HS2 operation handle guid string
- private String operationId;
-
private DriverState driverState = new DriverState();
@Override
@@ -945,7 +926,9 @@ public class Driver implements IDriver {
}
try {
- execute();
+ taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?)
+ Executor executor = new Executor(context, driverContext, driverState, taskQueue);
+ executor.execute();
} catch (CommandProcessorException cpe) {
rollback(cpe);
throw cpe;
@@ -1081,535 +1064,6 @@ public class Driver implements IDriver {
return false;
}
- private void useFetchFromCache(CacheEntry cacheEntry) {
- // Change query FetchTask to use new location specified in results cache.
- FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork());
- fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context);
- driverContext.getPlan().setFetchTask(fetchTaskFromCache);
- driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
- }
-
- private void preExecutionCacheActions() throws Exception {
- if (driverContext.getCacheUsage() != null) {
- if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
- driverContext.getPlan().getFetchTask() != null) {
- ValidTxnWriteIdList txnWriteIdList = null;
- if (driverContext.getPlan().hasAcidResourcesInQuery()) {
- txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
- }
- // The results of this query execution might be cacheable.
- // Add a placeholder entry in the cache so other queries know this result is pending.
- CacheEntry pendingCacheEntry =
- QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
- if (pendingCacheEntry != null) {
- // Update cacheUsage to reference the pending entry.
- this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
- }
- }
- }
- }
-
- private void postExecutionCacheActions() throws Exception {
- if (driverContext.getCacheUsage() != null) {
- if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
- // Using a previously cached result.
- CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
-
- // Reader count already incremented during cache lookup.
- // Save to usedCacheEntry to ensure reader is released after query.
- driverContext.setUsedCacheEntry(cacheEntry);
- } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
- driverContext.getCacheUsage().getCacheEntry() != null &&
- driverContext.getPlan().getFetchTask() != null) {
- // Save results to the cache for future queries to use.
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
-
- ValidTxnWriteIdList txnWriteIdList = null;
- if (driverContext.getPlan().hasAcidResourcesInQuery()) {
- txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
- }
- CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
- boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(
- cacheEntry,
- driverContext.getPlan().getFetchTask().getWork());
- LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
- if (savedToCache) {
- useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());
- // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
- driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry());
- }
-
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
- }
- }
- }
-
- private void execute() throws CommandProcessorException {
- PerfLogger perfLogger = SessionState.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
-
- boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME));
-
- int maxlen;
- if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
- maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH);
- } else {
- maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
- }
- Metrics metrics = MetricsFactory.getInstance();
-
- String queryId = driverContext.getPlan().getQueryId();
- // Get the query string from the conf file as the compileInternal() method might
- // hide sensitive information during query redaction.
- String queryStr = driverContext.getConf().getQueryString();
-
- driverState.lock();
- try {
- // if query is not in compiled state, or executing state which is carried over from
- // a combined compile/execute in runInternal, throws the error
- if (!driverState.isCompiled() && !driverState.isExecuting()) {
- String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + queryStr;
- CONSOLE.printError(errorMessage);
- throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
- } else {
- driverState.executing();
- }
- } finally {
- driverState.unlock();
- }
-
- HookContext hookContext = null;
-
- // Whether there's any error occurred during query execution. Used for query lifetime hook.
- boolean executionError = false;
-
- try {
- LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
- // compile and execute can get called from different threads in case of HS2
- // so clear timing in this thread's Hive object before proceeding.
- Hive.get().clearMetaCallTiming();
-
- driverContext.getPlan().setStarted();
-
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().startQuery(queryStr, queryId);
- SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan());
- }
- resStream = null;
-
- SessionState ss = SessionState.get();
-
- // TODO: should this use getUserFromAuthenticator?
- hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(),
- context.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(),
- InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(),
- ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), context);
- hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
-
- driverContext.getHookRunner().runPreHooks(hookContext);
-
- // Trigger query hooks before query execution.
- driverContext.getHookRunner().runBeforeExecutionHook(queryStr, hookContext);
-
- setQueryDisplays(driverContext.getPlan().getRootTasks());
- int mrJobs = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size();
- int jobs = mrJobs + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size()
- + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size();
- if (jobs > 0) {
- logMrWarning(mrJobs);
- CONSOLE.printInfo("Query ID = " + queryId);
- CONSOLE.printInfo("Total jobs = " + jobs);
- }
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
- String.valueOf(jobs));
- SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap());
- }
- String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
-
- // A runtime that launches runnable tasks as separate Threads through
- // TaskRunners
- // As soon as a task isRunnable, it is put in a queue
- // At any time, at most maxthreads tasks can be running
- // The main thread polls the TaskRunners to check if they have finished.
-
- DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, perfLogger);
-
- taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?)
- taskQueue.prepare(driverContext.getPlan());
-
- context.setHDFSCleanup(true);
-
- SessionState.get().setMapRedStats(new LinkedHashMap<>());
- SessionState.get().setStackTraces(new HashMap<>());
- SessionState.get().setLocalMapRedErrors(new HashMap<>());
-
- // Add root Tasks to runnable
- for (Task<?> tsk : driverContext.getPlan().getRootTasks()) {
- // This should never happen, if it does, it's a bug with the potential to produce
- // incorrect results.
- assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
- taskQueue.addToRunnable(tsk);
-
- if (metrics != null) {
- tsk.updateTaskMetrics(metrics);
- }
- }
-
- preExecutionCacheActions();
-
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
- // Loop while you either have tasks running, or tasks queued up
- while (taskQueue.isRunning()) {
- // Launch upto maxthreads tasks
- Task<?> task;
- int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
- while ((task = taskQueue.getRunnable(maxthreads)) != null) {
- TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, taskQueue);
- if (!runner.isRunning()) {
- break;
- }
- }
-
- // poll the Tasks to see which one completed
- TaskRunner tskRun = taskQueue.pollFinished();
- if (tskRun == null) {
- continue;
- }
- /*
- This should be removed eventually. HIVE-17814 gives more detail
- explanation of whats happening and HIVE-17815 as to why this is done.
- Briefly for replication the graph is huge and so memory pressure is going to be huge if
- we keep a lot of references around.
- */
- String opName = driverContext.getPlan().getOperationName();
- boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
- || opName.equals(HiveOperation.REPLLOAD.getOperationName());
- if (!isReplicationOperation) {
- hookContext.addCompleteTask(tskRun);
- }
-
- driverContext.getQueryDisplay().setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());
-
- Task<?> tsk = tskRun.getTask();
- TaskResult result = tskRun.getTaskResult();
-
- int exitVal = result.getExitVal();
- DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext,
- perfLogger);
-
- if (exitVal != 0) {
- Task<?> backupTask = tsk.getAndInitBackupTask();
- if (backupTask != null) {
- String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
- CONSOLE.printError(errorMessage);
- errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
- CONSOLE.printError(errorMessage);
-
- // add backup task to runnable
- if (TaskQueue.isLaunchable(backupTask)) {
- taskQueue.addToRunnable(backupTask);
- }
- continue;
-
- } else {
- String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
- if (taskQueue.isShutdown()) {
- errorMessage = "FAILED: Operation cancelled. " + errorMessage;
- }
- DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext,
- errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
- String sqlState = "08S01";
-
- // 08S01 (Communication error) is the default sql state. Override the sqlstate
- // based on the ErrorMsg set in HiveException.
- if (result.getTaskError() instanceof HiveException) {
- ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
- getCanonicalErrorMsg();
- if (errorMsg != ErrorMsg.GENERIC_ERROR) {
- sqlState = errorMsg.getSQLState();
- }
- }
-
- CONSOLE.printError(errorMessage);
- taskQueue.shutdown();
- // in case we decided to run everything in local mode, restore the
- // the jobtracker setting to its initial value
- context.restoreOriginalTracker();
- throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState,
- result.getTaskError());
- }
- }
-
- taskQueue.finished(tskRun);
-
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
- Keys.TASK_RET_CODE, String.valueOf(exitVal));
- SessionState.get().getHiveHistory().endTask(queryId, tsk);
- }
-
- if (tsk.getChildTasks() != null) {
- for (Task<?> child : tsk.getChildTasks()) {
- if (TaskQueue.isLaunchable(child)) {
- taskQueue.addToRunnable(child);
- }
- }
- }
- }
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
-
- postExecutionCacheActions();
-
- // in case we decided to run everything in local mode, restore the
- // the jobtracker setting to its initial value
- context.restoreOriginalTracker();
-
- if (taskQueue.isShutdown()) {
- String errorMessage = "FAILED: Operation cancelled";
- DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null);
- CONSOLE.printError(errorMessage);
- throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
- }
-
- // remove incomplete outputs.
- // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
- // remove them
- HashSet<WriteEntity> remOutputs = new LinkedHashSet<WriteEntity>();
- for (WriteEntity output : driverContext.getPlan().getOutputs()) {
- if (!output.isComplete()) {
- remOutputs.add(output);
- }
- }
-
- for (WriteEntity output : remOutputs) {
- driverContext.getPlan().getOutputs().remove(output);
- }
-
-
- hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
-
- driverContext.getHookRunner().runPostExecHooks(hookContext);
-
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
- String.valueOf(0));
- SessionState.get().getHiveHistory().printRowCount(queryId);
- }
- releasePlan(driverContext.getPlan());
- } catch (CommandProcessorException cpe) {
- executionError = true;
- throw cpe;
- } catch (Throwable e) {
- executionError = true;
-
- DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(),
- hookContext, perfLogger);
-
- context.restoreOriginalTracker();
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
- String.valueOf(12));
- }
- // TODO: do better with handling types of Exception here
- String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
- if (hookContext != null) {
- try {
- DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, e);
- } catch (Exception t) {
- LOG.warn("Failed to invoke failure hook", t);
- }
- }
- CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e));
- throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e);
- } finally {
- // Trigger query hooks after query completes its execution.
- try {
- driverContext.getHookRunner().runAfterExecutionHook(queryStr, hookContext, executionError);
- } catch (Exception e) {
- LOG.warn("Failed when invoking query after execution hook", e);
- }
-
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().endQuery(queryId);
- }
- if (noName) {
- driverContext.getConf().set(MRJobConfig.JOB_NAME, "");
- }
- double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;
-
- ImmutableMap<String, Long> executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution");
- driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
-
- Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
- if (stats != null && !stats.isEmpty()) {
- long totalCpu = 0;
- long numModifiedRows = 0;
- CONSOLE.printInfo("MapReduce Jobs Launched: ");
- for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
- CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
- totalCpu += entry.getValue().getCpuMSec();
-
- if (numModifiedRows > -1) {
- //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum.
- numModifiedRows = addWithOverflowCheck(numModifiedRows, entry.getValue().getNumModifiedRows());
- }
- }
- driverContext.getQueryState().setNumModifiedRows(numModifiedRows);
- CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
- }
- SparkSession ss = SessionState.get().getSparkSession();
- if (ss != null) {
- ss.onQueryCompletion(queryId);
- }
- driverState.lock();
- try {
- driverState.executionFinished(executionError);
- } finally {
- driverState.unlock();
- }
- if (driverState.isAborted()) {
- LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
- } else {
- LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
- }
- }
- }
-
- private long addWithOverflowCheck(long val1, long val2) {
- try {
- return Math.addExact(val1, val2);
- } catch (ArithmeticException e) {
- return -1;
- }
- }
-
- private void releasePlan(QueryPlan plan) {
- // Plan maybe null if Driver.close is called in another thread for the same Driver object
- driverState.lock();
- try {
- if (plan != null) {
- plan.setDone();
- if (SessionState.get() != null) {
- try {
- SessionState.get().getHiveHistory().logPlanProgress(plan);
- } catch (Exception e) {
- // Log and ignore
- LOG.warn("Could not log query plan progress", e);
- }
- }
- }
- } finally {
- driverState.unlock();
- }
- }
-
- private void setQueryDisplays(List<Task<?>> tasks) {
- if (tasks != null) {
- Set<Task<?>> visited = new HashSet<Task<?>>();
- while (!tasks.isEmpty()) {
- tasks = setQueryDisplays(tasks, visited);
- }
- }
- }
-
- private List<Task<?>> setQueryDisplays(
- List<Task<?>> tasks,
- Set<Task<?>> visited) {
- List<Task<?>> childTasks = new ArrayList<>();
- for (Task<?> task : tasks) {
- if (visited.contains(task)) {
- continue;
- }
- task.setQueryDisplay(driverContext.getQueryDisplay());
- if (task.getDependentTasks() != null) {
- childTasks.addAll(task.getDependentTasks());
- }
- visited.add(task);
- }
- return childTasks;
- }
-
- private void logMrWarning(int mrJobs) {
- if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE)))) {
- return;
- }
- String warning = HiveConf.generateMrDeprecationWarning();
- LOG.warn(warning);
- }
-
- private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) {
- String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName();
- if (downstreamError != null) {
- //here we assume that upstream code may have parametrized the msg from ErrorMsg
- //so we want to keep it
- if (downstreamError.getMessage() != null) {
- errorMessage += ". " + downstreamError.getMessage();
- } else {
- errorMessage += ". " + StringUtils.stringifyException(downstreamError);
- }
- }
- else {
- ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
- if (em != null) {
- errorMessage += ". " + em.getMsg();
- }
- }
-
- return errorMessage;
- }
-
- /**
- * Launches a new task
- *
- * @param tsk
- * task being launched
- * @param queryId
- * Id of the query containing the task
- * @param noName
- * whether the task has a name set
- * @param jobname
- * name of the task, if it is a map-reduce job
- * @param jobs
- * number of map-reduce jobs
- * @param taskQueue
- * the task queue
- */
- private TaskRunner launchTask(Task<?> tsk, String queryId, boolean noName,
- String jobname, int jobs, TaskQueue taskQueue) throws HiveException {
- if (SessionState.get() != null) {
- SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
- }
- if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
- if (noName) {
- driverContext.getConf().set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
- }
- driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
- Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan());
- taskQueue.incCurJobNo(1);
- CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs);
- }
- tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context);
- TaskRunner tskRun = new TaskRunner(tsk, taskQueue);
-
- taskQueue.launching(tskRun);
- // Launch Task
- if (HiveConf.getBoolVar(tsk.getConf(), HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {
- // Launch it in the parallel mode, as a separate thread only for MR tasks
- if (LOG.isInfoEnabled()){
- LOG.info("Starting task [" + tsk + "] in parallel");
- }
- tskRun.start();
- } else {
- if (LOG.isInfoEnabled()){
- LOG.info("Starting task [" + tsk + "] in serial mode");
- }
- tskRun.runSequential();
- }
- return tskRun;
- }
-
@Override
public boolean isFetchingTable() {
return driverContext.getFetchTask() != null;
@@ -1635,10 +1089,10 @@ public class Driver implements IDriver {
return driverContext.getFetchTask().fetch(res);
}
- if (resStream == null) {
- resStream = context.getStream();
+ if (driverContext.getResStream() == null) {
+ driverContext.setResStream(context.getStream());
}
- if (resStream == null) {
+ if (driverContext.getResStream() == null) {
return false;
}
@@ -1646,7 +1100,7 @@ public class Driver implements IDriver {
String row = null;
while (numRows < maxRows) {
- if (resStream == null) {
+ if (driverContext.getResStream() == null) {
if (numRows > 0) {
return true;
} else {
@@ -1657,7 +1111,7 @@ public class Driver implements IDriver {
bos.reset();
Utilities.StreamStatus ss;
try {
- ss = Utilities.readColumn(resStream, bos);
+ ss = Utilities.readColumn(driverContext.getResStream(), bos);
if (bos.getLength() > 0) {
row = new String(bos.getData(), 0, bos.getLength(), "UTF-8");
} else if (ss == Utilities.StreamStatus.TERMINATED) {
@@ -1675,7 +1129,7 @@ public class Driver implements IDriver {
}
if (ss == Utilities.StreamStatus.EOF) {
- resStream = context.getStream();
+ driverContext.setResStream(context.getStream());
}
}
return true;
@@ -1696,7 +1150,7 @@ public class Driver implements IDriver {
driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, context);
} else {
context.resetStream();
- resStream = null;
+ driverContext.setResStream(null);
}
}
@@ -1756,9 +1210,9 @@ public class Driver implements IDriver {
private void releaseResStream() {
try {
- if (resStream != null) {
- ((FSDataInputStream) resStream).close();
- resStream = null;
+ if (driverContext.getResStream() != null) {
+ ((FSDataInputStream) driverContext.getResStream()).close();
+ driverContext.setResStream(null);
}
} catch (Exception e) {
LOG.debug(" Exception while closing the resStream ", e);
@@ -1883,11 +1337,11 @@ public class Driver implements IDriver {
/**
* Set the HS2 operation handle's guid string
- * @param opId base64 encoded guid string
+ * @param operationId base64 encoded guid string
*/
@Override
- public void setOperationId(String opId) {
- this.operationId = opId;
+ public void setOperationId(String operationId) {
+ driverContext.setOperationId(operationId);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index bbf7fe5..a8c83fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql;
+import java.io.DataInput;
+
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Schema;
@@ -71,6 +73,11 @@ public class DriverContext {
private Context backupContext = null;
private boolean retrial = false;
+ private DataInput resStream;
+
+ // HS2 operation handle guid string
+ private String operationId;
+
public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner hookRunner,
HiveTxnManager initTxnManager) {
this.queryState = queryState;
@@ -215,4 +222,20 @@ public class DriverContext {
public void setRetrial(boolean retrial) {
this.retrial = retrial;
}
+
+ public DataInput getResStream() {
+ return resStream;
+ }
+
+ public void setResStream(DataInput resStream) {
+ this.resStream = resStream;
+ }
+
+ public String getOperationId() {
+ return operationId;
+ }
+
+ public void setOperationId(String operationId) {
+ this.operationId = operationId;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java
new file mode 100644
index 0000000..e9909a9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.DagUtils;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.TaskResult;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Executes the Query Plan.
+ */
+public class Executor {
+ private static final String CLASS_NAME = Driver.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+ private final Context context;
+ private final DriverContext driverContext;
+ private final DriverState driverState;
+ private final TaskQueue taskQueue;
+
+ private HookContext hookContext;
+
+ public Executor(Context context, DriverContext driverContext, DriverState driverState, TaskQueue taskQueue) {
+ this.context = context;
+ this.driverContext = driverContext;
+ this.driverState = driverState;
+ this.taskQueue = taskQueue;
+ }
+
+ public void execute() throws CommandProcessorException {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
+
+ boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME));
+
+ checkState();
+
+ // Whether there's any error occurred during query execution. Used for query lifetime hook.
+ boolean executionError = false;
+
+ try {
+ LOG.info("Executing command(queryId=" + driverContext.getQueryId() + "): " + driverContext.getQueryString());
+
+ // TODO: should this use getUserFromAuthenticator?
+ hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(),
+ context.getPathToCS(), SessionState.get().getUserName(), SessionState.get().getUserIpAddress(),
+ InetAddress.getLocalHost().getHostAddress(), driverContext.getOperationId(),
+ SessionState.get().getSessionId(), Thread.currentThread().getName(), SessionState.get().isHiveServerQuery(),
+ SessionState.getPerfLogger(), driverContext.getQueryInfo(), context);
+
+ preExecutionActions();
+ preExecutionCacheActions();
+ runTasks(noName);
+ postExecutionCacheActions();
+ postExecutionActions();
+ } catch (CommandProcessorException cpe) {
+ executionError = true;
+ throw cpe;
+ } catch (Throwable e) {
+ executionError = true;
+ DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(),
+ hookContext, SessionState.getPerfLogger());
+ handleException(hookContext, e);
+ } finally {
+ cleanUp(noName, hookContext, executionError);
+ }
+ }
+
+ private void checkState() throws CommandProcessorException {
+ driverState.lock();
+ try {
+ // if query is not in compiled state, or executing state which is carried over from
+ // a combined compile/execute in runInternal, throws the error
+ if (!driverState.isCompiled() && !driverState.isExecuting()) {
+ String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " +
+ driverContext.getQueryString();
+ CONSOLE.printError(errorMessage);
+ throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
+ } else {
+ driverState.executing();
+ }
+ } finally {
+ driverState.unlock();
+ }
+ }
+
+ private void preExecutionActions() throws Exception {
+ // compile and execute can get called from different threads in case of HS2
+ // so clear timing in this thread's Hive object before proceeding.
+ Hive.get().clearMetaCallTiming();
+
+ driverContext.getPlan().setStarted();
+
+ SessionState.get().getHiveHistory().startQuery(driverContext.getQueryString(), driverContext.getQueryId());
+ SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan());
+ driverContext.setResStream(null);
+
+ hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
+ driverContext.getHookRunner().runPreHooks(hookContext);
+
+ // Trigger query hooks before query execution.
+ driverContext.getHookRunner().runBeforeExecutionHook(driverContext.getQueryString(), hookContext);
+
+ setQueryDisplays(driverContext.getPlan().getRootTasks());
+
+ // A runtime that launches runnable tasks as separate Threads through TaskRunners
+ // As soon as a task isRunnable, it is put in a queue
+ // At any time, at most maxthreads tasks can be running
+ // The main thread polls the TaskRunners to check if they have finished.
+
+ DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext,
+ SessionState.getPerfLogger());
+
+ taskQueue.prepare(driverContext.getPlan());
+
+ context.setHDFSCleanup(true);
+
+ SessionState.get().setMapRedStats(new LinkedHashMap<>());
+ SessionState.get().setStackTraces(new HashMap<>());
+ SessionState.get().setLocalMapRedErrors(new HashMap<>());
+
+ // Add root Tasks to runnable
+ Metrics metrics = MetricsFactory.getInstance();
+ for (Task<?> task : driverContext.getPlan().getRootTasks()) {
+ // This should never happen, if it does, it's a bug with the potential to produce
+ // incorrect results.
+ assert task.getParentTasks() == null || task.getParentTasks().isEmpty();
+ taskQueue.addToRunnable(task);
+
+ if (metrics != null) {
+ task.updateTaskMetrics(metrics);
+ }
+ }
+ }
+
+ private void setQueryDisplays(List<Task<?>> tasks) {
+ if (tasks != null) {
+ Set<Task<?>> visited = new HashSet<Task<?>>();
+ while (!tasks.isEmpty()) {
+ tasks = setQueryDisplays(tasks, visited);
+ }
+ }
+ }
+
+ private List<Task<?>> setQueryDisplays(List<Task<?>> tasks, Set<Task<?>> visited) {
+ List<Task<?>> childTasks = new ArrayList<>();
+ for (Task<?> task : tasks) {
+ if (visited.contains(task)) {
+ continue;
+ }
+ task.setQueryDisplay(driverContext.getQueryDisplay());
+ if (task.getDependentTasks() != null) {
+ childTasks.addAll(task.getDependentTasks());
+ }
+ visited.add(task);
+ }
+ return childTasks;
+ }
+
+ private void preExecutionCacheActions() throws Exception {
+ if (driverContext.getCacheUsage() == null) {
+ return;
+ }
+
+ if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ driverContext.getPlan().getFetchTask() != null) {
+ ValidTxnWriteIdList txnWriteIdList = null;
+ if (driverContext.getPlan().hasAcidResourcesInQuery()) {
+ txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
+ }
+ // The results of this query execution might be cacheable.
+ // Add a placeholder entry in the cache so other queries know this result is pending.
+ CacheEntry pendingCacheEntry =
+ QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
+ if (pendingCacheEntry != null) {
+ // Update cacheUsage to reference the pending entry.
+ this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
+ }
+ }
+ }
+
+ private void runTasks(boolean noName) throws Exception {
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
+
+ int jobCount = getJobCount();
+ String jobName = getJobName();
+
+ // Loop while you either have tasks running, or tasks queued up
+ while (taskQueue.isRunning()) {
+ launchTasks(noName, jobCount, jobName);
+ handleFinished();
+ }
+
+ SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
+ }
+
+ private void handleFinished() throws Exception {
+ // poll the Tasks to see which one completed
+ TaskRunner taskRun = taskQueue.pollFinished();
+ if (taskRun == null) {
+ return;
+ }
+ /*
+ This should be removed eventually. HIVE-17814 gives more detail
+ explanation of whats happening and HIVE-17815 as to why this is done.
+ Briefly for replication the graph is huge and so memory pressure is going to be huge if
+ we keep a lot of references around.
+ */
+ String opName = driverContext.getPlan().getOperationName();
+ boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
+ || opName.equals(HiveOperation.REPLLOAD.getOperationName());
+ if (!isReplicationOperation) {
+ hookContext.addCompleteTask(taskRun);
+ }
+
+ driverContext.getQueryDisplay().setTaskResult(taskRun.getTask().getId(), taskRun.getTaskResult());
+
+ Task<?> task = taskRun.getTask();
+ TaskResult result = taskRun.getTaskResult();
+
+ int exitVal = result.getExitVal();
+ DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext,
+ SessionState.getPerfLogger());
+
+ if (exitVal != 0) {
+ handleTaskFailure(task, result, exitVal);
+ return;
+ }
+
+ taskQueue.finished(taskRun);
+
+ SessionState.get().getHiveHistory().setTaskProperty(driverContext.getQueryId(), task.getId(),
+ Keys.TASK_RET_CODE, String.valueOf(exitVal));
+ SessionState.get().getHiveHistory().endTask(driverContext.getQueryId(), task);
+
+ if (task.getChildTasks() != null) {
+ for (Task<?> child : task.getChildTasks()) {
+ if (TaskQueue.isLaunchable(child)) {
+ taskQueue.addToRunnable(child);
+ }
+ }
+ }
+ }
+
+ private String getJobName() {
+ int maxlen;
+ if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+ maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH);
+ } else {
+ maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+ }
+ return Utilities.abbreviate(driverContext.getQueryString(), maxlen - 6);
+ }
+
+ private int getJobCount() {
+ int mrJobCount = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size();
+ int jobCount = mrJobCount + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size()
+ + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size();
+ if (jobCount > 0) {
+ if (mrJobCount > 0 && "mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE))) {
+ LOG.warn(HiveConf.generateMrDeprecationWarning());
+ }
+ CONSOLE.printInfo("Query ID = " + driverContext.getPlan().getQueryId());
+ CONSOLE.printInfo("Total jobs = " + jobCount);
+ }
+ if (SessionState.get() != null) {
+ SessionState.get().getHiveHistory().setQueryProperty(driverContext.getPlan().getQueryId(), Keys.QUERY_NUM_TASKS,
+ String.valueOf(jobCount));
+ SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap());
+ }
+ return jobCount;
+ }
+
+ private void launchTasks(boolean noName, int jobCount, String jobName) throws HiveException {
+ // Launch upto maxthreads tasks
+ Task<?> task;
+ int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
+ while ((task = taskQueue.getRunnable(maxthreads)) != null) {
+ TaskRunner runner = launchTask(task, noName, jobName, jobCount);
+ if (!runner.isRunning()) {
+ break;
+ }
+ }
+ }
+
+ private TaskRunner launchTask(Task<?> task, boolean noName, String jobName, int jobCount) throws HiveException {
+ SessionState.get().getHiveHistory().startTask(driverContext.getQueryId(), task, task.getClass().getName());
+
+ if (task.isMapRedTask() && !(task instanceof ConditionalTask)) {
+ if (noName) {
+ driverContext.getConf().set(MRJobConfig.JOB_NAME, jobName + " (" + task.getId() + ")");
+ }
+ driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, task.getId());
+ Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan());
+ taskQueue.incCurJobNo(1);
+ CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobCount);
+ }
+
+ task.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context);
+ TaskRunner taskRun = new TaskRunner(task, taskQueue);
+ taskQueue.launching(taskRun);
+
+ if (HiveConf.getBoolVar(task.getConf(), HiveConf.ConfVars.EXECPARALLEL) && task.canExecuteInParallel()) {
+ LOG.info("Starting task [" + task + "] in parallel");
+ taskRun.start();
+ } else {
+ LOG.info("Starting task [" + task + "] in serial mode");
+ taskRun.runSequential();
+ }
+ return taskRun;
+ }
+
+ private void handleTaskFailure(Task<?> task, TaskResult result, int exitVal)
+ throws HiveException, Exception, CommandProcessorException {
+ Task<?> backupTask = task.getAndInitBackupTask();
+ if (backupTask != null) {
+ String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task);
+ CONSOLE.printError(errorMessage);
+ errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
+ CONSOLE.printError(errorMessage);
+
+ // add backup task to runnable
+ if (TaskQueue.isLaunchable(backupTask)) {
+ taskQueue.addToRunnable(backupTask);
+ }
+ } else {
+ String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task);
+ if (taskQueue.isShutdown()) {
+ errorMessage = "FAILED: Operation cancelled. " + errorMessage;
+ }
+ DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext,
+ errorMessage + Strings.nullToEmpty(task.getDiagnosticsMessage()), result.getTaskError());
+ String sqlState = "08S01";
+
+ // 08S01 (Communication error) is the default sql state. Override the sqlstate
+ // based on the ErrorMsg set in HiveException.
+ if (result.getTaskError() instanceof HiveException) {
+ ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
+ getCanonicalErrorMsg();
+ if (errorMsg != ErrorMsg.GENERIC_ERROR) {
+ sqlState = errorMsg.getSQLState();
+ }
+ }
+
+ CONSOLE.printError(errorMessage);
+ taskQueue.shutdown();
+ // in case we decided to run everything in local mode, restore the
+ // the jobtracker setting to its initial value
+ context.restoreOriginalTracker();
+ throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState,
+ result.getTaskError());
+ }
+ }
+
+ private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task<?> task) {
+ String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + task.getClass().getName();
+ if (downstreamError != null) {
+ //here we assume that upstream code may have parametrized the msg from ErrorMsg so we want to keep it
+ if (downstreamError.getMessage() != null) {
+ errorMessage += ". " + downstreamError.getMessage();
+ } else {
+ errorMessage += ". " + StringUtils.stringifyException(downstreamError);
+ }
+ } else {
+ ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
+ if (em != null) {
+ errorMessage += ". " + em.getMsg();
+ }
+ }
+
+ return errorMessage;
+ }
+
+ private void postExecutionCacheActions() throws Exception {
+ if (driverContext.getCacheUsage() == null) {
+ return;
+ }
+
+ if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+ // Using a previously cached result.
+ CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
+
+ // Reader count already incremented during cache lookup.
+ // Save to usedCacheEntry to ensure reader is released after query.
+ driverContext.setUsedCacheEntry(cacheEntry);
+ } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+ driverContext.getCacheUsage().getCacheEntry() != null && driverContext.getPlan().getFetchTask() != null) {
+ // Save results to the cache for future queries to use.
+ SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+
+ CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
+ boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry,
+ driverContext.getPlan().getFetchTask().getWork());
+ LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
+ if (savedToCache) {
+ useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());
+ // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
+ driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry());
+ }
+
+ SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+ }
+ }
+
+ private void useFetchFromCache(CacheEntry cacheEntry) {
+ // Change query FetchTask to use new location specified in results cache.
+ FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork());
+ fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context);
+ driverContext.getPlan().setFetchTask(fetchTaskFromCache);
+ driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
+ }
+
+ private void postExecutionActions() throws Exception {
+ // in case we decided to run everything in local mode, restore the the jobtracker setting to its initial value
+ context.restoreOriginalTracker();
+
+ if (taskQueue.isShutdown()) {
+ String errorMessage = "FAILED: Operation cancelled";
+ DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, null);
+ CONSOLE.printError(errorMessage);
+ throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
+ }
+
+ // Remove incomplete outputs.
+ // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions, remove them
+ driverContext.getPlan().getOutputs().removeIf(x -> !x.isComplete());
+
+ hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
+ driverContext.getHookRunner().runPostExecHooks(hookContext);
+
+ SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE,
+ String.valueOf(0));
+ SessionState.get().getHiveHistory().printRowCount(driverContext.getQueryId());
+ releasePlan(driverContext.getPlan());
+ }
+
+ private void releasePlan(QueryPlan plan) {
+ // Plan maybe null if Driver.close is called in another thread for the same Driver object
+ driverState.lock();
+ try {
+ if (plan != null) {
+ plan.setDone();
+ if (SessionState.get() != null) {
+ try {
+ SessionState.get().getHiveHistory().logPlanProgress(plan);
+ } catch (Exception e) {
+ // Log and ignore
+ LOG.warn("Could not log query plan progress", e);
+ }
+ }
+ }
+ } finally {
+ driverState.unlock();
+ }
+ }
+
+ private void handleException(HookContext hookContext, Throwable e) throws CommandProcessorException {
+ context.restoreOriginalTracker();
+ if (SessionState.get() != null) {
+ SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE,
+ String.valueOf(12));
+ }
+ // TODO: do better with handling types of Exception here
+ String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
+ if (hookContext != null) {
+ try {
+ DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, e);
+ } catch (Exception t) {
+ LOG.warn("Failed to invoke failure hook", t);
+ }
+ }
+ CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e));
+ throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e);
+ }
+
+ private void cleanUp(boolean noName, HookContext hookContext, boolean executionError) {
+ // Trigger query hooks after query completes its execution.
+ try {
+ driverContext.getHookRunner().runAfterExecutionHook(driverContext.getQueryString(), hookContext, executionError);
+ } catch (Exception e) {
+ LOG.warn("Failed when invoking query after execution hook", e);
+ }
+
+ SessionState.get().getHiveHistory().endQuery(driverContext.getQueryId());
+ if (noName) {
+ driverContext.getConf().set(MRJobConfig.JOB_NAME, "");
+ }
+ double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE) / 1000.00;
+
+ ImmutableMap<String, Long> executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution");
+ driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
+
+ logExecutionResourceUsage();
+
+ if (SessionState.get().getSparkSession() != null) {
+ SessionState.get().getSparkSession().onQueryCompletion(driverContext.getQueryId());
+ }
+
+ driverState.lock();
+ try {
+ driverState.executionFinished(executionError);
+ } finally {
+ driverState.unlock();
+ }
+
+ if (driverState.isAborted()) {
+ LOG.info("Executing command(queryId={}) has been interrupted after {} seconds", driverContext.getQueryId(),
+ duration);
+ } else {
+ LOG.info("Completed executing command(queryId={}); Time taken: {} seconds", driverContext.getQueryId(), duration);
+ }
+ }
+
+ private void logExecutionResourceUsage() {
+ Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
+ if (stats != null && !stats.isEmpty()) {
+ long totalCpu = 0;
+ long numModifiedRows = 0;
+ CONSOLE.printInfo("MapReduce Jobs Launched: ");
+ for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
+ CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
+ totalCpu += entry.getValue().getCpuMSec();
+
+ if (numModifiedRows > -1) {
+ //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum.
+ try {
+ numModifiedRows = Math.addExact(numModifiedRows, entry.getValue().getNumModifiedRows());
+ } catch (ArithmeticException e) {
+ numModifiedRows = -1;
+ }
+ }
+ }
+ driverContext.getQueryState().setNumModifiedRows(numModifiedRows);
+ CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
index baad269..cd05f52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
@@ -45,7 +45,7 @@ public interface IDriver extends CommandProcessor {
QueryDisplay getQueryDisplay();
- void setOperationId(String guid64);
+ void setOperationId(String operationId);
CommandProcessorResponse run() throws CommandProcessorException;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
index eab7f45..c307085 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -141,8 +141,8 @@ public class ReExecDriver implements IDriver {
}
@Override
- public void setOperationId(String guid64) {
- coreDriver.setOperationId(guid64);
+ public void setOperationId(String operationId) {
+ coreDriver.setOperationId(operationId);
}
@Override