You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mg...@apache.org on 2020/02/09 21:55:32 UTC

[hive] branch master updated: HIVE-22835 Extract Executor from Driver (Miklos Gergely, reviewed by Zoltan Haindrich)

This is an automated email from the ASF dual-hosted git repository.

mgergely pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d9deba  HIVE-22835 Extract Executor from Driver (Miklos Gergely, reviewed by Zoltan Haindrich)
0d9deba is described below

commit 0d9deba3c15038df4c64ea9b8494d554eb8eea2f
Author: miklosgergely <mg...@cloudera.com>
AuthorDate: Wed Feb 5 18:55:55 2020 +0100

    HIVE-22835 Extract Executor from Driver (Miklos Gergely, reviewed by Zoltan Haindrich)
---
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  | 578 +-------------------
 .../org/apache/hadoop/hive/ql/DriverContext.java   |  23 +
 .../java/org/apache/hadoop/hive/ql/Executor.java   | 593 +++++++++++++++++++++
 ql/src/java/org/apache/hadoop/hive/ql/IDriver.java |   2 +-
 .../apache/hadoop/hive/ql/reexec/ReExecDriver.java |   4 +-
 5 files changed, 635 insertions(+), 565 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 5191800..1f8bc12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -18,14 +18,10 @@
 
 package org.apache.hadoop.hive.ql;
 
-import java.io.DataInput;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -53,26 +49,18 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
-import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
 import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId;
 import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DagUtils;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.TaskResult;
-import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
-import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.hooks.Entity;
-import org.apache.hadoop.hive.ql.hooks.HookContext;
-import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lock.CompileLock;
 import org.apache.hadoop.hive.ql.lock.CompileLockFactory;
@@ -82,7 +70,6 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.formatting.JsonMetaDataFormatter;
@@ -102,7 +89,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.ql.wm.WmContext;
 import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.common.util.TxnIdUtils;
@@ -111,7 +97,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
 
 public class Driver implements IDriver {
 
@@ -124,15 +109,11 @@ public class Driver implements IDriver {
   private int maxRows = 100;
   private ByteStream.Output bos = new ByteStream.Output();
 
-  private DataInput resStream;
   private Context context;
   private final DriverContext driverContext;
   private TaskQueue taskQueue;
   private final List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
 
-  // HS2 operation handle guid string
-  private String operationId;
-
   private DriverState driverState = new DriverState();
 
   @Override
@@ -945,7 +926,9 @@ public class Driver implements IDriver {
       }
 
       try {
-        execute();
+        taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?)
+        Executor executor = new Executor(context, driverContext, driverState, taskQueue);
+        executor.execute();
       } catch (CommandProcessorException cpe) {
         rollback(cpe);
         throw cpe;
@@ -1081,535 +1064,6 @@ public class Driver implements IDriver {
     return false;
   }
 
-  private void useFetchFromCache(CacheEntry cacheEntry) {
-    // Change query FetchTask to use new location specified in results cache.
-    FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork());
-    fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context);
-    driverContext.getPlan().setFetchTask(fetchTaskFromCache);
-    driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
-  }
-
-  private void preExecutionCacheActions() throws Exception {
-    if (driverContext.getCacheUsage() != null) {
-      if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
-          driverContext.getPlan().getFetchTask() != null) {
-        ValidTxnWriteIdList txnWriteIdList = null;
-        if (driverContext.getPlan().hasAcidResourcesInQuery()) {
-          txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
-        }
-        // The results of this query execution might be cacheable.
-        // Add a placeholder entry in the cache so other queries know this result is pending.
-        CacheEntry pendingCacheEntry =
-            QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
-        if (pendingCacheEntry != null) {
-          // Update cacheUsage to reference the pending entry.
-          this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
-        }
-      }
-    }
-  }
-
-  private void postExecutionCacheActions() throws Exception {
-    if (driverContext.getCacheUsage() != null) {
-      if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
-        // Using a previously cached result.
-        CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
-
-        // Reader count already incremented during cache lookup.
-        // Save to usedCacheEntry to ensure reader is released after query.
-        driverContext.setUsedCacheEntry(cacheEntry);
-      } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
-          driverContext.getCacheUsage().getCacheEntry() != null &&
-          driverContext.getPlan().getFetchTask() != null) {
-        // Save results to the cache for future queries to use.
-        PerfLogger perfLogger = SessionState.getPerfLogger();
-        perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
-
-        ValidTxnWriteIdList txnWriteIdList = null;
-        if (driverContext.getPlan().hasAcidResourcesInQuery()) {
-          txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
-        }
-        CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
-        boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(
-            cacheEntry,
-            driverContext.getPlan().getFetchTask().getWork());
-        LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
-        if (savedToCache) {
-          useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());
-          // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
-          driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry());
-        }
-
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
-      }
-    }
-  }
-
-  private void execute() throws CommandProcessorException {
-    PerfLogger perfLogger = SessionState.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
-
-    boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME));
-
-    int maxlen;
-    if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
-      maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH);
-    } else {
-      maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
-    }
-    Metrics metrics = MetricsFactory.getInstance();
-
-    String queryId = driverContext.getPlan().getQueryId();
-    // Get the query string from the conf file as the compileInternal() method might
-    // hide sensitive information during query redaction.
-    String queryStr = driverContext.getConf().getQueryString();
-
-    driverState.lock();
-    try {
-      // if query is not in compiled state, or executing state which is carried over from
-      // a combined compile/execute in runInternal, throws the error
-      if (!driverState.isCompiled() && !driverState.isExecuting()) {
-        String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " + queryStr;
-        CONSOLE.printError(errorMessage);
-        throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
-      } else {
-        driverState.executing();
-      }
-    } finally {
-      driverState.unlock();
-    }
-
-    HookContext hookContext = null;
-
-    // Whether there's any error occurred during query execution. Used for query lifetime hook.
-    boolean executionError = false;
-
-    try {
-      LOG.info("Executing command(queryId=" + queryId + "): " + queryStr);
-      // compile and execute can get called from different threads in case of HS2
-      // so clear timing in this thread's Hive object before proceeding.
-      Hive.get().clearMetaCallTiming();
-
-      driverContext.getPlan().setStarted();
-
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().startQuery(queryStr, queryId);
-        SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan());
-      }
-      resStream = null;
-
-      SessionState ss = SessionState.get();
-
-      // TODO: should this use getUserFromAuthenticator?
-      hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(),
-          context.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(),
-          InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(),
-          ss.isHiveServerQuery(), perfLogger, driverContext.getQueryInfo(), context);
-      hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
-
-      driverContext.getHookRunner().runPreHooks(hookContext);
-
-      // Trigger query hooks before query execution.
-      driverContext.getHookRunner().runBeforeExecutionHook(queryStr, hookContext);
-
-      setQueryDisplays(driverContext.getPlan().getRootTasks());
-      int mrJobs = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size();
-      int jobs = mrJobs + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size()
-          + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size();
-      if (jobs > 0) {
-        logMrWarning(mrJobs);
-        CONSOLE.printInfo("Query ID = " + queryId);
-        CONSOLE.printInfo("Total jobs = " + jobs);
-      }
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
-            String.valueOf(jobs));
-        SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap());
-      }
-      String jobname = Utilities.abbreviate(queryStr, maxlen - 6);
-
-      // A runtime that launches runnable tasks as separate Threads through
-      // TaskRunners
-      // As soon as a task isRunnable, it is put in a queue
-      // At any time, at most maxthreads tasks can be running
-      // The main thread polls the TaskRunners to check if they have finished.
-
-      DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext, perfLogger);
-
-      taskQueue = new TaskQueue(context); // for canceling the query (should be bound to session?)
-      taskQueue.prepare(driverContext.getPlan());
-
-      context.setHDFSCleanup(true);
-
-      SessionState.get().setMapRedStats(new LinkedHashMap<>());
-      SessionState.get().setStackTraces(new HashMap<>());
-      SessionState.get().setLocalMapRedErrors(new HashMap<>());
-
-      // Add root Tasks to runnable
-      for (Task<?> tsk : driverContext.getPlan().getRootTasks()) {
-        // This should never happen, if it does, it's a bug with the potential to produce
-        // incorrect results.
-        assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
-        taskQueue.addToRunnable(tsk);
-
-        if (metrics != null) {
-          tsk.updateTaskMetrics(metrics);
-        }
-      }
-
-      preExecutionCacheActions();
-
-      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
-      // Loop while you either have tasks running, or tasks queued up
-      while (taskQueue.isRunning()) {
-        // Launch upto maxthreads tasks
-        Task<?> task;
-        int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
-        while ((task = taskQueue.getRunnable(maxthreads)) != null) {
-          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, taskQueue);
-          if (!runner.isRunning()) {
-            break;
-          }
-        }
-
-        // poll the Tasks to see which one completed
-        TaskRunner tskRun = taskQueue.pollFinished();
-        if (tskRun == null) {
-          continue;
-        }
-        /*
-          This should be removed eventually. HIVE-17814 gives more detail
-          explanation of whats happening and HIVE-17815 as to why this is done.
-          Briefly for replication the graph is huge and so memory pressure is going to be huge if
-          we keep a lot of references around.
-        */
-        String opName = driverContext.getPlan().getOperationName();
-        boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
-            || opName.equals(HiveOperation.REPLLOAD.getOperationName());
-        if (!isReplicationOperation) {
-          hookContext.addCompleteTask(tskRun);
-        }
-
-        driverContext.getQueryDisplay().setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult());
-
-        Task<?> tsk = tskRun.getTask();
-        TaskResult result = tskRun.getTaskResult();
-
-        int exitVal = result.getExitVal();
-        DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext,
-            perfLogger);
-
-        if (exitVal != 0) {
-          Task<?> backupTask = tsk.getAndInitBackupTask();
-          if (backupTask != null) {
-            String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
-            CONSOLE.printError(errorMessage);
-            errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
-            CONSOLE.printError(errorMessage);
-
-            // add backup task to runnable
-            if (TaskQueue.isLaunchable(backupTask)) {
-              taskQueue.addToRunnable(backupTask);
-            }
-            continue;
-
-          } else {
-            String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
-            if (taskQueue.isShutdown()) {
-              errorMessage = "FAILED: Operation cancelled. " + errorMessage;
-            }
-            DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext,
-              errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
-            String sqlState = "08S01";
-
-            // 08S01 (Communication error) is the default sql state.  Override the sqlstate
-            // based on the ErrorMsg set in HiveException.
-            if (result.getTaskError() instanceof HiveException) {
-              ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
-                  getCanonicalErrorMsg();
-              if (errorMsg != ErrorMsg.GENERIC_ERROR) {
-                sqlState = errorMsg.getSQLState();
-              }
-            }
-
-            CONSOLE.printError(errorMessage);
-            taskQueue.shutdown();
-            // in case we decided to run everything in local mode, restore the
-            // the jobtracker setting to its initial value
-            context.restoreOriginalTracker();
-            throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState,
-                result.getTaskError());
-          }
-        }
-
-        taskQueue.finished(tskRun);
-
-        if (SessionState.get() != null) {
-          SessionState.get().getHiveHistory().setTaskProperty(queryId, tsk.getId(),
-              Keys.TASK_RET_CODE, String.valueOf(exitVal));
-          SessionState.get().getHiveHistory().endTask(queryId, tsk);
-        }
-
-        if (tsk.getChildTasks() != null) {
-          for (Task<?> child : tsk.getChildTasks()) {
-            if (TaskQueue.isLaunchable(child)) {
-              taskQueue.addToRunnable(child);
-            }
-          }
-        }
-      }
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
-
-      postExecutionCacheActions();
-
-      // in case we decided to run everything in local mode, restore the
-      // the jobtracker setting to its initial value
-      context.restoreOriginalTracker();
-
-      if (taskQueue.isShutdown()) {
-        String errorMessage = "FAILED: Operation cancelled";
-        DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, null);
-        CONSOLE.printError(errorMessage);
-        throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
-      }
-
-      // remove incomplete outputs.
-      // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions.
-      // remove them
-      HashSet<WriteEntity> remOutputs = new LinkedHashSet<WriteEntity>();
-      for (WriteEntity output : driverContext.getPlan().getOutputs()) {
-        if (!output.isComplete()) {
-          remOutputs.add(output);
-        }
-      }
-
-      for (WriteEntity output : remOutputs) {
-        driverContext.getPlan().getOutputs().remove(output);
-      }
-
-
-      hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
-
-      driverContext.getHookRunner().runPostExecHooks(hookContext);
-
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
-            String.valueOf(0));
-        SessionState.get().getHiveHistory().printRowCount(queryId);
-      }
-      releasePlan(driverContext.getPlan());
-    } catch (CommandProcessorException cpe) {
-      executionError = true;
-      throw cpe;
-    } catch (Throwable e) {
-      executionError = true;
-
-      DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(),
-          hookContext, perfLogger);
-
-      context.restoreOriginalTracker();
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_RET_CODE,
-            String.valueOf(12));
-      }
-      // TODO: do better with handling types of Exception here
-      String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
-      if (hookContext != null) {
-        try {
-          DriverUtils.invokeFailureHooks(driverContext, perfLogger, hookContext, errorMessage, e);
-        } catch (Exception t) {
-          LOG.warn("Failed to invoke failure hook", t);
-        }
-      }
-      CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e));
-      throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e);
-    } finally {
-      // Trigger query hooks after query completes its execution.
-      try {
-        driverContext.getHookRunner().runAfterExecutionHook(queryStr, hookContext, executionError);
-      } catch (Exception e) {
-        LOG.warn("Failed when invoking query after execution hook", e);
-      }
-
-      if (SessionState.get() != null) {
-        SessionState.get().getHiveHistory().endQuery(queryId);
-      }
-      if (noName) {
-        driverContext.getConf().set(MRJobConfig.JOB_NAME, "");
-      }
-      double duration = perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE)/1000.00;
-
-      ImmutableMap<String, Long> executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution");
-      driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
-
-      Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
-      if (stats != null && !stats.isEmpty()) {
-        long totalCpu = 0;
-        long numModifiedRows = 0;
-        CONSOLE.printInfo("MapReduce Jobs Launched: ");
-        for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
-          CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
-          totalCpu += entry.getValue().getCpuMSec();
-
-          if (numModifiedRows > -1) {
-            //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum.
-            numModifiedRows = addWithOverflowCheck(numModifiedRows, entry.getValue().getNumModifiedRows());
-          }
-        }
-        driverContext.getQueryState().setNumModifiedRows(numModifiedRows);
-        CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
-      }
-      SparkSession ss = SessionState.get().getSparkSession();
-      if (ss != null) {
-        ss.onQueryCompletion(queryId);
-      }
-      driverState.lock();
-      try {
-        driverState.executionFinished(executionError);
-      } finally {
-        driverState.unlock();
-      }
-      if (driverState.isAborted()) {
-        LOG.info("Executing command(queryId=" + queryId + ") has been interrupted after " + duration + " seconds");
-      } else {
-        LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds");
-      }
-    }
-  }
-
-  private long addWithOverflowCheck(long val1, long val2) {
-    try {
-      return Math.addExact(val1, val2);
-    } catch (ArithmeticException e) {
-      return -1;
-    }
-  }
-
-  private void releasePlan(QueryPlan plan) {
-    // Plan maybe null if Driver.close is called in another thread for the same Driver object
-    driverState.lock();
-    try {
-      if (plan != null) {
-        plan.setDone();
-        if (SessionState.get() != null) {
-          try {
-            SessionState.get().getHiveHistory().logPlanProgress(plan);
-          } catch (Exception e) {
-            // Log and ignore
-            LOG.warn("Could not log query plan progress", e);
-          }
-        }
-      }
-    } finally {
-      driverState.unlock();
-    }
-  }
-
-  private void setQueryDisplays(List<Task<?>> tasks) {
-    if (tasks != null) {
-      Set<Task<?>> visited = new HashSet<Task<?>>();
-      while (!tasks.isEmpty()) {
-        tasks = setQueryDisplays(tasks, visited);
-      }
-    }
-  }
-
-  private List<Task<?>> setQueryDisplays(
-          List<Task<?>> tasks,
-          Set<Task<?>> visited) {
-    List<Task<?>> childTasks = new ArrayList<>();
-    for (Task<?> task : tasks) {
-      if (visited.contains(task)) {
-        continue;
-      }
-      task.setQueryDisplay(driverContext.getQueryDisplay());
-      if (task.getDependentTasks() != null) {
-        childTasks.addAll(task.getDependentTasks());
-      }
-      visited.add(task);
-    }
-    return childTasks;
-  }
-
-  private void logMrWarning(int mrJobs) {
-    if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE)))) {
-      return;
-    }
-    String warning = HiveConf.generateMrDeprecationWarning();
-    LOG.warn(warning);
-  }
-
-  private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) {
-    String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + tsk.getClass().getName();
-    if (downstreamError != null) {
-      //here we assume that upstream code may have parametrized the msg from ErrorMsg
-      //so we want to keep it
-      if (downstreamError.getMessage() != null) {
-        errorMessage += ". " + downstreamError.getMessage();
-      } else {
-        errorMessage += ". " + StringUtils.stringifyException(downstreamError);
-      }
-    }
-    else {
-      ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
-      if (em != null) {
-        errorMessage += ". " +  em.getMsg();
-      }
-    }
-
-    return errorMessage;
-  }
-
-  /**
-   * Launches a new task
-   *
-   * @param tsk
-   *          task being launched
-   * @param queryId
-   *          Id of the query containing the task
-   * @param noName
-   *          whether the task has a name set
-   * @param jobname
-   *          name of the task, if it is a map-reduce job
-   * @param jobs
-   *          number of map-reduce jobs
-   * @param taskQueue
-   *          the task queue
-   */
-  private TaskRunner launchTask(Task<?> tsk, String queryId, boolean noName,
-      String jobname, int jobs, TaskQueue taskQueue) throws HiveException {
-    if (SessionState.get() != null) {
-      SessionState.get().getHiveHistory().startTask(queryId, tsk, tsk.getClass().getName());
-    }
-    if (tsk.isMapRedTask() && !(tsk instanceof ConditionalTask)) {
-      if (noName) {
-        driverContext.getConf().set(MRJobConfig.JOB_NAME, jobname + " (" + tsk.getId() + ")");
-      }
-      driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, tsk.getId());
-      Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan());
-      taskQueue.incCurJobNo(1);
-      CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobs);
-    }
-    tsk.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context);
-    TaskRunner tskRun = new TaskRunner(tsk, taskQueue);
-
-    taskQueue.launching(tskRun);
-    // Launch Task
-    if (HiveConf.getBoolVar(tsk.getConf(), HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) {
-      // Launch it in the parallel mode, as a separate thread only for MR tasks
-      if (LOG.isInfoEnabled()){
-        LOG.info("Starting task [" + tsk + "] in parallel");
-      }
-      tskRun.start();
-    } else {
-      if (LOG.isInfoEnabled()){
-        LOG.info("Starting task [" + tsk + "] in serial mode");
-      }
-      tskRun.runSequential();
-    }
-    return tskRun;
-  }
-
   @Override
   public boolean isFetchingTable() {
     return driverContext.getFetchTask() != null;
@@ -1635,10 +1089,10 @@ public class Driver implements IDriver {
       return driverContext.getFetchTask().fetch(res);
     }
 
-    if (resStream == null) {
-      resStream = context.getStream();
+    if (driverContext.getResStream() == null) {
+      driverContext.setResStream(context.getStream());
     }
-    if (resStream == null) {
+    if (driverContext.getResStream() == null) {
       return false;
     }
 
@@ -1646,7 +1100,7 @@ public class Driver implements IDriver {
     String row = null;
 
     while (numRows < maxRows) {
-      if (resStream == null) {
+      if (driverContext.getResStream() == null) {
         if (numRows > 0) {
           return true;
         } else {
@@ -1657,7 +1111,7 @@ public class Driver implements IDriver {
       bos.reset();
       Utilities.StreamStatus ss;
       try {
-        ss = Utilities.readColumn(resStream, bos);
+        ss = Utilities.readColumn(driverContext.getResStream(), bos);
         if (bos.getLength() > 0) {
           row = new String(bos.getData(), 0, bos.getLength(), "UTF-8");
         } else if (ss == Utilities.StreamStatus.TERMINATED) {
@@ -1675,7 +1129,7 @@ public class Driver implements IDriver {
       }
 
       if (ss == Utilities.StreamStatus.EOF) {
-        resStream = context.getStream();
+        driverContext.setResStream(context.getStream());
       }
     }
     return true;
@@ -1696,7 +1150,7 @@ public class Driver implements IDriver {
       driverContext.getFetchTask().initialize(driverContext.getQueryState(), null, null, context);
     } else {
       context.resetStream();
-      resStream = null;
+      driverContext.setResStream(null);
     }
   }
 
@@ -1756,9 +1210,9 @@ public class Driver implements IDriver {
 
   private void releaseResStream() {
     try {
-      if (resStream != null) {
-        ((FSDataInputStream) resStream).close();
-        resStream = null;
+      if (driverContext.getResStream() != null) {
+        ((FSDataInputStream) driverContext.getResStream()).close();
+        driverContext.setResStream(null);
       }
     } catch (Exception e) {
       LOG.debug(" Exception while closing the resStream ", e);
@@ -1883,11 +1337,11 @@ public class Driver implements IDriver {
 
   /**
    * Set the HS2 operation handle's guid string
-   * @param opId base64 encoded guid string
+   * @param operationId base64 encoded guid string
    */
   @Override
-  public void setOperationId(String opId) {
-    this.operationId = opId;
+  public void setOperationId(String operationId) {
+    driverContext.setOperationId(operationId);
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index bbf7fe5..a8c83fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql;
 
+import java.io.DataInput;
+
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Schema;
@@ -71,6 +73,11 @@ public class DriverContext {
   private Context backupContext = null;
   private boolean retrial = false;
 
+  private DataInput resStream;
+
+  // HS2 operation handle guid string
+  private String operationId;
+
   public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner hookRunner,
       HiveTxnManager initTxnManager) {
     this.queryState = queryState;
@@ -215,4 +222,20 @@ public class DriverContext {
   public void setRetrial(boolean retrial) {
     this.retrial = retrial;
   }
+
+  public DataInput getResStream() {
+    return resStream;
+  }
+
+  public void setResStream(DataInput resStream) {
+    this.resStream = resStream;
+  }
+
+  public String getOperationId() {
+    return operationId;
+  }
+
+  public void setOperationId(String operationId) {
+    this.operationId = operationId;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Executor.java b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java
new file mode 100644
index 0000000..e9909a9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Executor.java
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.metrics.common.Metrics;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
+import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.DagUtils;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.TaskResult;
+import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Executes the Query Plan.
+ */
+public class Executor {
+  private static final String CLASS_NAME = Driver.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+  private static final LogHelper CONSOLE = new LogHelper(LOG);
+
+  private final Context context;
+  private final DriverContext driverContext;
+  private final DriverState driverState;
+  private final TaskQueue taskQueue;
+
+  private HookContext hookContext;
+
+  public Executor(Context context, DriverContext driverContext, DriverState driverState, TaskQueue taskQueue) {
+    this.context = context;
+    this.driverContext = driverContext;
+    this.driverState = driverState;
+    this.taskQueue = taskQueue;
+  }
+
+  public void execute() throws CommandProcessorException {
+    SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
+
+    boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME));
+
+    checkState();
+
+    // Whether there's any error occurred during query execution. Used for query lifetime hook.
+    boolean executionError = false;
+
+    try {
+      LOG.info("Executing command(queryId=" + driverContext.getQueryId() + "): " + driverContext.getQueryString());
+
+      // TODO: should this use getUserFromAuthenticator?
+      hookContext = new PrivateHookContext(driverContext.getPlan(), driverContext.getQueryState(),
+          context.getPathToCS(), SessionState.get().getUserName(), SessionState.get().getUserIpAddress(),
+          InetAddress.getLocalHost().getHostAddress(), driverContext.getOperationId(),
+          SessionState.get().getSessionId(), Thread.currentThread().getName(), SessionState.get().isHiveServerQuery(),
+          SessionState.getPerfLogger(), driverContext.getQueryInfo(), context);
+
+      preExecutionActions();
+      preExecutionCacheActions();
+      runTasks(noName);
+      postExecutionCacheActions();
+      postExecutionActions();
+    } catch (CommandProcessorException cpe) {
+      executionError = true;
+      throw cpe;
+    } catch (Throwable e) {
+      executionError = true;
+      DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(),
+          hookContext, SessionState.getPerfLogger());
+      handleException(hookContext, e);
+    } finally {
+      cleanUp(noName, hookContext, executionError);
+    }
+  }
+
+  private void checkState() throws CommandProcessorException {
+    driverState.lock();
+    try {
+      // if query is not in compiled state, or executing state which is carried over from
+      // a combined compile/execute in runInternal, throws the error
+      if (!driverState.isCompiled() && !driverState.isExecuting()) {
+        String errorMessage = "FAILED: unexpected driverstate: " + driverState + ", for query " +
+            driverContext.getQueryString();
+        CONSOLE.printError(errorMessage);
+        throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
+      } else {
+        driverState.executing();
+      }
+    } finally {
+      driverState.unlock();
+    }
+  }
+
+  private void preExecutionActions() throws Exception {
+    // compile and execute can get called from different threads in case of HS2
+    // so clear timing in this thread's Hive object before proceeding.
+    Hive.get().clearMetaCallTiming();
+
+    driverContext.getPlan().setStarted();
+
+    SessionState.get().getHiveHistory().startQuery(driverContext.getQueryString(), driverContext.getQueryId());
+    SessionState.get().getHiveHistory().logPlanProgress(driverContext.getPlan());
+    driverContext.setResStream(null);
+
+    hookContext.setHookType(HookContext.HookType.PRE_EXEC_HOOK);
+    driverContext.getHookRunner().runPreHooks(hookContext);
+
+    // Trigger query hooks before query execution.
+    driverContext.getHookRunner().runBeforeExecutionHook(driverContext.getQueryString(), hookContext);
+
+    setQueryDisplays(driverContext.getPlan().getRootTasks());
+
+    // A runtime that launches runnable tasks as separate Threads through TaskRunners
+    // As soon as a task isRunnable, it is put in a queue
+    // At any time, at most maxthreads tasks can be running
+    // The main thread polls the TaskRunners to check if they have finished.
+
+    DriverUtils.checkInterrupted(driverState, driverContext, "before running tasks.", hookContext,
+        SessionState.getPerfLogger());
+
+    taskQueue.prepare(driverContext.getPlan());
+
+    context.setHDFSCleanup(true);
+
+    SessionState.get().setMapRedStats(new LinkedHashMap<>());
+    SessionState.get().setStackTraces(new HashMap<>());
+    SessionState.get().setLocalMapRedErrors(new HashMap<>());
+
+    // Add root Tasks to runnable
+    Metrics metrics = MetricsFactory.getInstance();
+    for (Task<?> task : driverContext.getPlan().getRootTasks()) {
+      // This should never happen, if it does, it's a bug with the potential to produce
+      // incorrect results.
+      assert task.getParentTasks() == null || task.getParentTasks().isEmpty();
+      taskQueue.addToRunnable(task);
+
+      if (metrics != null) {
+        task.updateTaskMetrics(metrics);
+      }
+    }
+  }
+
+  private void setQueryDisplays(List<Task<?>> tasks) {
+    if (tasks != null) {
+      Set<Task<?>> visited = new HashSet<Task<?>>();
+      while (!tasks.isEmpty()) {
+        tasks = setQueryDisplays(tasks, visited);
+      }
+    }
+  }
+
+  private List<Task<?>> setQueryDisplays(List<Task<?>> tasks, Set<Task<?>> visited) {
+    List<Task<?>> childTasks = new ArrayList<>();
+    for (Task<?> task : tasks) {
+      if (visited.contains(task)) {
+        continue;
+      }
+      task.setQueryDisplay(driverContext.getQueryDisplay());
+      if (task.getDependentTasks() != null) {
+        childTasks.addAll(task.getDependentTasks());
+      }
+      visited.add(task);
+    }
+    return childTasks;
+  }
+
+  private void preExecutionCacheActions() throws Exception {
+    if (driverContext.getCacheUsage() == null) {
+      return;
+    }
+
+    if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+        driverContext.getPlan().getFetchTask() != null) {
+      ValidTxnWriteIdList txnWriteIdList = null;
+      if (driverContext.getPlan().hasAcidResourcesInQuery()) {
+        txnWriteIdList = AcidUtils.getValidTxnWriteIdList(driverContext.getConf());
+      }
+      // The results of this query execution might be cacheable.
+      // Add a placeholder entry in the cache so other queries know this result is pending.
+      CacheEntry pendingCacheEntry =
+          QueryResultsCache.getInstance().addToCache(driverContext.getCacheUsage().getQueryInfo(), txnWriteIdList);
+      if (pendingCacheEntry != null) {
+        // Update cacheUsage to reference the pending entry.
+        this.driverContext.getCacheUsage().setCacheEntry(pendingCacheEntry);
+      }
+    }
+  }
+
+  private void runTasks(boolean noName) throws Exception {
+    SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
+
+    int jobCount = getJobCount();
+    String jobName = getJobName();
+
+    // Loop while you either have tasks running, or tasks queued up
+    while (taskQueue.isRunning()) {
+      launchTasks(noName, jobCount, jobName);
+      handleFinished();
+    }
+
+    SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
+  }
+
+  private void handleFinished() throws Exception {
+    // poll the Tasks to see which one completed
+    TaskRunner taskRun = taskQueue.pollFinished();
+    if (taskRun == null) {
+      return;
+    }
+    /*
+      This should be removed eventually. HIVE-17814 gives more detail
+      explanation of whats happening and HIVE-17815 as to why this is done.
+      Briefly for replication the graph is huge and so memory pressure is going to be huge if
+      we keep a lot of references around.
+    */
+    String opName = driverContext.getPlan().getOperationName();
+    boolean isReplicationOperation = opName.equals(HiveOperation.REPLDUMP.getOperationName())
+        || opName.equals(HiveOperation.REPLLOAD.getOperationName());
+    if (!isReplicationOperation) {
+      hookContext.addCompleteTask(taskRun);
+    }
+
+    driverContext.getQueryDisplay().setTaskResult(taskRun.getTask().getId(), taskRun.getTaskResult());
+
+    Task<?> task = taskRun.getTask();
+    TaskResult result = taskRun.getTaskResult();
+
+    int exitVal = result.getExitVal();
+    DriverUtils.checkInterrupted(driverState, driverContext, "when checking the execution result.", hookContext,
+        SessionState.getPerfLogger());
+
+    if (exitVal != 0) {
+      handleTaskFailure(task, result, exitVal);
+      return;
+    }
+
+    taskQueue.finished(taskRun);
+
+    SessionState.get().getHiveHistory().setTaskProperty(driverContext.getQueryId(), task.getId(),
+        Keys.TASK_RET_CODE, String.valueOf(exitVal));
+    SessionState.get().getHiveHistory().endTask(driverContext.getQueryId(), task);
+
+    if (task.getChildTasks() != null) {
+      for (Task<?> child : task.getChildTasks()) {
+        if (TaskQueue.isLaunchable(child)) {
+          taskQueue.addToRunnable(child);
+        }
+      }
+    }
+  }
+
+  private String getJobName() {
+    int maxlen;
+    if ("spark".equals(driverContext.getConf().getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+      maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH);
+    } else {
+      maxlen = driverContext.getConf().getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+    }
+    return Utilities.abbreviate(driverContext.getQueryString(), maxlen - 6);
+  }
+
+  private int getJobCount() {
+    int mrJobCount = Utilities.getMRTasks(driverContext.getPlan().getRootTasks()).size();
+    int jobCount = mrJobCount + Utilities.getTezTasks(driverContext.getPlan().getRootTasks()).size()
+        + Utilities.getSparkTasks(driverContext.getPlan().getRootTasks()).size();
+    if (jobCount > 0) {
+      if (mrJobCount > 0 && "mr".equals(HiveConf.getVar(driverContext.getConf(), ConfVars.HIVE_EXECUTION_ENGINE))) {
+        LOG.warn(HiveConf.generateMrDeprecationWarning());
+      }
+      CONSOLE.printInfo("Query ID = " + driverContext.getPlan().getQueryId());
+      CONSOLE.printInfo("Total jobs = " + jobCount);
+    }
+    if (SessionState.get() != null) {
+      SessionState.get().getHiveHistory().setQueryProperty(driverContext.getPlan().getQueryId(), Keys.QUERY_NUM_TASKS,
+          String.valueOf(jobCount));
+      SessionState.get().getHiveHistory().setIdToTableMap(driverContext.getPlan().getIdToTableNameMap());
+    }
+    return jobCount;
+  }
+
+  private void launchTasks(boolean noName, int jobCount, String jobName) throws HiveException {
+    // Launch upto maxthreads tasks
+    Task<?> task;
+    int maxthreads = HiveConf.getIntVar(driverContext.getConf(), HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
+    while ((task = taskQueue.getRunnable(maxthreads)) != null) {
+      TaskRunner runner = launchTask(task, noName, jobName, jobCount);
+      if (!runner.isRunning()) {
+        break;
+      }
+    }
+  }
+
+  private TaskRunner launchTask(Task<?> task, boolean noName, String jobName, int jobCount) throws HiveException {
+    SessionState.get().getHiveHistory().startTask(driverContext.getQueryId(), task, task.getClass().getName());
+
+    if (task.isMapRedTask() && !(task instanceof ConditionalTask)) {
+      if (noName) {
+        driverContext.getConf().set(MRJobConfig.JOB_NAME, jobName + " (" + task.getId() + ")");
+      }
+      driverContext.getConf().set(DagUtils.MAPREDUCE_WORKFLOW_NODE_NAME, task.getId());
+      Utilities.setWorkflowAdjacencies(driverContext.getConf(), driverContext.getPlan());
+      taskQueue.incCurJobNo(1);
+      CONSOLE.printInfo("Launching Job " + taskQueue.getCurJobNo() + " out of " + jobCount);
+    }
+
+    task.initialize(driverContext.getQueryState(), driverContext.getPlan(), taskQueue, context);
+    TaskRunner taskRun = new TaskRunner(task, taskQueue);
+    taskQueue.launching(taskRun);
+
+    if (HiveConf.getBoolVar(task.getConf(), HiveConf.ConfVars.EXECPARALLEL) && task.canExecuteInParallel()) {
+      LOG.info("Starting task [" + task + "] in parallel");
+      taskRun.start();
+    } else {
+      LOG.info("Starting task [" + task + "] in serial mode");
+      taskRun.runSequential();
+    }
+    return taskRun;
+  }
+
+  private void handleTaskFailure(Task<?> task, TaskResult result, int exitVal)
+      throws HiveException, Exception, CommandProcessorException {
+    Task<?> backupTask = task.getAndInitBackupTask();
+    if (backupTask != null) {
+      String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task);
+      CONSOLE.printError(errorMessage);
+      errorMessage = "ATTEMPT: Execute BackupTask: " + backupTask.getClass().getName();
+      CONSOLE.printError(errorMessage);
+
+      // add backup task to runnable
+      if (TaskQueue.isLaunchable(backupTask)) {
+        taskQueue.addToRunnable(backupTask);
+      }
+    } else {
+      String errorMessage = getErrorMsgAndDetail(exitVal, result.getTaskError(), task);
+      if (taskQueue.isShutdown()) {
+        errorMessage = "FAILED: Operation cancelled. " + errorMessage;
+      }
+      DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext,
+          errorMessage + Strings.nullToEmpty(task.getDiagnosticsMessage()), result.getTaskError());
+      String sqlState = "08S01";
+
+      // 08S01 (Communication error) is the default sql state.  Override the sqlstate
+      // based on the ErrorMsg set in HiveException.
+      if (result.getTaskError() instanceof HiveException) {
+        ErrorMsg errorMsg = ((HiveException) result.getTaskError()).
+            getCanonicalErrorMsg();
+        if (errorMsg != ErrorMsg.GENERIC_ERROR) {
+          sqlState = errorMsg.getSQLState();
+        }
+      }
+
+      CONSOLE.printError(errorMessage);
+      taskQueue.shutdown();
+      // in case we decided to run everything in local mode, restore the
+      // the jobtracker setting to its initial value
+      context.restoreOriginalTracker();
+      throw DriverUtils.createProcessorException(driverContext, exitVal, errorMessage, sqlState,
+          result.getTaskError());
+    }
+  }
+
+  private String getErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task<?> task) {
+    String errorMessage = "FAILED: Execution Error, return code " + exitVal + " from " + task.getClass().getName();
+    if (downstreamError != null) {
+      //here we assume that upstream code may have parametrized the msg from ErrorMsg so we want to keep it
+      if (downstreamError.getMessage() != null) {
+        errorMessage += ". " + downstreamError.getMessage();
+      } else {
+        errorMessage += ". " + StringUtils.stringifyException(downstreamError);
+      }
+    } else {
+      ErrorMsg em = ErrorMsg.getErrorMsg(exitVal);
+      if (em != null) {
+        errorMessage += ". " +  em.getMsg();
+      }
+    }
+
+    return errorMessage;
+  }
+
+  private void postExecutionCacheActions() throws Exception {
+    if (driverContext.getCacheUsage() == null) {
+      return;
+    }
+
+    if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.QUERY_USING_CACHE) {
+      // Using a previously cached result.
+      CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
+
+      // Reader count already incremented during cache lookup.
+      // Save to usedCacheEntry to ensure reader is released after query.
+      driverContext.setUsedCacheEntry(cacheEntry);
+    } else if (driverContext.getCacheUsage().getStatus() == CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS &&
+        driverContext.getCacheUsage().getCacheEntry() != null && driverContext.getPlan().getFetchTask() != null) {
+      // Save results to the cache for future queries to use.
+      SessionState.getPerfLogger().PerfLogBegin(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+
+      CacheEntry cacheEntry = driverContext.getCacheUsage().getCacheEntry();
+      boolean savedToCache = QueryResultsCache.getInstance().setEntryValid(cacheEntry,
+          driverContext.getPlan().getFetchTask().getWork());
+      LOG.info("savedToCache: {} ({})", savedToCache, cacheEntry);
+      if (savedToCache) {
+        useFetchFromCache(driverContext.getCacheUsage().getCacheEntry());
+        // setEntryValid() already increments the reader count. Set usedCacheEntry so it gets released.
+        driverContext.setUsedCacheEntry(driverContext.getCacheUsage().getCacheEntry());
+      }
+
+      SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.SAVE_TO_RESULTS_CACHE);
+    }
+  }
+
+  private void useFetchFromCache(CacheEntry cacheEntry) {
+    // Change query FetchTask to use new location specified in results cache.
+    FetchTask fetchTaskFromCache = (FetchTask) TaskFactory.get(cacheEntry.getFetchWork());
+    fetchTaskFromCache.initialize(driverContext.getQueryState(), driverContext.getPlan(), null, context);
+    driverContext.getPlan().setFetchTask(fetchTaskFromCache);
+    driverContext.setCacheUsage(new CacheUsage(CacheUsage.CacheStatus.QUERY_USING_CACHE, cacheEntry));
+  }
+
+  private void postExecutionActions() throws Exception {
+    // in case we decided to run everything in local mode, restore the the jobtracker setting to its initial value
+    context.restoreOriginalTracker();
+
+    if (taskQueue.isShutdown()) {
+      String errorMessage = "FAILED: Operation cancelled";
+      DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, null);
+      CONSOLE.printError(errorMessage);
+      throw DriverUtils.createProcessorException(driverContext, 1000, errorMessage, "HY008", null);
+    }
+
+    // Remove incomplete outputs.
+    // Some incomplete outputs may be added at the beginning, for eg: for dynamic partitions, remove them
+    driverContext.getPlan().getOutputs().removeIf(x -> !x.isComplete());
+
+    hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
+    driverContext.getHookRunner().runPostExecHooks(hookContext);
+
+    SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE,
+        String.valueOf(0));
+    SessionState.get().getHiveHistory().printRowCount(driverContext.getQueryId());
+    releasePlan(driverContext.getPlan());
+  }
+
+  private void releasePlan(QueryPlan plan) {
+    // Plan maybe null if Driver.close is called in another thread for the same Driver object
+    driverState.lock();
+    try {
+      if (plan != null) {
+        plan.setDone();
+        if (SessionState.get() != null) {
+          try {
+            SessionState.get().getHiveHistory().logPlanProgress(plan);
+          } catch (Exception e) {
+            // Log and ignore
+            LOG.warn("Could not log query plan progress", e);
+          }
+        }
+      }
+    } finally {
+      driverState.unlock();
+    }
+  }
+
+  private void handleException(HookContext hookContext, Throwable e) throws CommandProcessorException {
+    context.restoreOriginalTracker();
+    if (SessionState.get() != null) {
+      SessionState.get().getHiveHistory().setQueryProperty(driverContext.getQueryId(), Keys.QUERY_RET_CODE,
+          String.valueOf(12));
+    }
+    // TODO: do better with handling types of Exception here
+    String errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
+    if (hookContext != null) {
+      try {
+        DriverUtils.invokeFailureHooks(driverContext, SessionState.getPerfLogger(), hookContext, errorMessage, e);
+      } catch (Exception t) {
+        LOG.warn("Failed to invoke failure hook", t);
+      }
+    }
+    CONSOLE.printError(errorMessage + "\n" + StringUtils.stringifyException(e));
+    throw DriverUtils.createProcessorException(driverContext, 12, errorMessage, "08S01", e);
+  }
+
+  private void cleanUp(boolean noName, HookContext hookContext, boolean executionError) {
+    // Trigger query hooks after query completes its execution.
+    try {
+      driverContext.getHookRunner().runAfterExecutionHook(driverContext.getQueryString(), hookContext, executionError);
+    } catch (Exception e) {
+      LOG.warn("Failed when invoking query after execution hook", e);
+    }
+
+    SessionState.get().getHiveHistory().endQuery(driverContext.getQueryId());
+    if (noName) {
+      driverContext.getConf().set(MRJobConfig.JOB_NAME, "");
+    }
+    double duration = SessionState.getPerfLogger().PerfLogEnd(CLASS_NAME, PerfLogger.DRIVER_EXECUTE) / 1000.00;
+
+    ImmutableMap<String, Long> executionHMSTimings = Hive.dumpMetaCallTimingWithoutEx("execution");
+    driverContext.getQueryDisplay().setHmsTimings(QueryDisplay.Phase.EXECUTION, executionHMSTimings);
+
+    logExecutionResourceUsage();
+
+    if (SessionState.get().getSparkSession() != null) {
+      SessionState.get().getSparkSession().onQueryCompletion(driverContext.getQueryId());
+    }
+
+    driverState.lock();
+    try {
+      driverState.executionFinished(executionError);
+    } finally {
+      driverState.unlock();
+    }
+
+    if (driverState.isAborted()) {
+      LOG.info("Executing command(queryId={}) has been interrupted after {} seconds", driverContext.getQueryId(),
+          duration);
+    } else {
+      LOG.info("Completed executing command(queryId={}); Time taken: {} seconds", driverContext.getQueryId(), duration);
+    }
+  }
+
+  private void logExecutionResourceUsage() {
+    Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
+    if (stats != null && !stats.isEmpty()) {
+      long totalCpu = 0;
+      long numModifiedRows = 0;
+      CONSOLE.printInfo("MapReduce Jobs Launched: ");
+      for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
+        CONSOLE.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
+        totalCpu += entry.getValue().getCpuMSec();
+
+        if (numModifiedRows > -1) {
+          //if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum.
+          try {
+            numModifiedRows = Math.addExact(numModifiedRows, entry.getValue().getNumModifiedRows());
+          } catch (ArithmeticException e) {
+            numModifiedRows = -1;
+          }
+        }
+      }
+      driverContext.getQueryState().setNumModifiedRows(numModifiedRows);
+      CONSOLE.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
index baad269..cd05f52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/IDriver.java
@@ -45,7 +45,7 @@ public interface IDriver extends CommandProcessor {
 
   QueryDisplay getQueryDisplay();
 
-  void setOperationId(String guid64);
+  void setOperationId(String operationId);
 
   CommandProcessorResponse run() throws CommandProcessorException;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
index eab7f45..c307085 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecDriver.java
@@ -141,8 +141,8 @@ public class ReExecDriver implements IDriver {
   }
 
   @Override
-  public void setOperationId(String guid64) {
-    coreDriver.setOperationId(guid64);
+  public void setOperationId(String operationId) {
+    coreDriver.setOperationId(operationId);
   }
 
   @Override