You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/11 23:31:30 UTC

hive git commit: HIVE-13458 : Heartbeater doesn't fail query when heartbeat fails (Wei Zheng, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master b9e4fe856 -> 66a021164


HIVE-13458 : Heartbeater doesn't fail query when heartbeat fails (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/66a02116
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/66a02116
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/66a02116

Branch: refs/heads/master
Commit: 66a02116453427601fd806fe999a753e3a201d49
Parents: b9e4fe8
Author: Wei Zheng <we...@apache.org>
Authored: Wed May 11 16:14:02 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Wed May 11 16:14:02 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  1 +
 .../java/org/apache/hadoop/hive/ql/Context.java | 18 ++++++
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |  3 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    | 20 ++++--
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |  7 ++-
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  2 +-
 .../hadoop/hive/ql/io/merge/MergeFileTask.java  |  2 +-
 .../ql/io/rcfile/stats/PartialScanTask.java     |  2 +-
 .../io/rcfile/truncate/ColumnTruncateTask.java  |  2 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 66 ++++++++++++++++----
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 +++++++
 .../index_compact_entry_limit.q.out             |  2 +-
 .../index_compact_size_limit.q.out              |  2 +-
 13 files changed, 124 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f2273c0..541af57 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1138,6 +1138,7 @@ public class HiveConf extends Configuration {
     HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false),
     HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only.  Will mark every ACID transaction aborted", false),
     HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only.  Will cause CompactorMR to fail.", false),
+    HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only.  Will cause Heartbeater to fail.", false),
 
     HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
         "Merge small files at the end of a map-only job"),

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 6f18c82..92b4e5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -43,9 +43,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -121,6 +123,8 @@ public class Context {
 
   private final String stagingDir;
 
+  private Heartbeater heartbeater;
+
   public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
   }
@@ -760,4 +764,18 @@ public class Context {
   public CompilationOpContext getOpContext() {
     return opContext;
   }
+
+  public Heartbeater getHeartbeater() {
+    return heartbeater;
+  }
+
+  public void setHeartbeater(Heartbeater heartbeater) {
+    this.heartbeater = heartbeater;
+  }
+
+  public void checkHeartbeaterLockException() throws LockException {
+    if (getHeartbeater() != null && getHeartbeater().getLockException() != null) {
+      throw getHeartbeater().getLockException();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 926f6e8..8a6499b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -433,10 +433,11 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
       rj = jc.submitJob(job);
       this.jobID = rj.getJobID();
       updateStatusInQueryDisplay();
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, ctx);
       success = (returnVal == 0);
     } catch (Exception e) {
       e.printStackTrace();
+      setException(e);
       String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
       if (rj != null) {
         mesg = "Ended Job = " + rj.getJobID() + mesg;

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index c15316bb..5656f9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskHandle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -213,7 +215,7 @@ public class HadoopJobExecHelper {
     return this.callBackObj.checkFatalErrors(ctrs, errMsg);
   }
 
-  private MapRedStats progress(ExecDriverTaskHandle th) throws IOException {
+  private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockException {
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
@@ -233,6 +235,10 @@ public class HadoopJobExecHelper {
     final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);
 
     while (!rj.isComplete()) {
+      if (th.getContext() != null) {
+        th.getContext().checkHeartbeaterLockException();
+      }
+
       try {
         Thread.sleep(pullInterval);
       } catch (InterruptedException e) {
@@ -452,6 +458,7 @@ public class HadoopJobExecHelper {
   private static class ExecDriverTaskHandle extends TaskHandle {
     JobClient jc;
     RunningJob rj;
+    Context ctx;
 
     JobClient getJobClient() {
       return jc;
@@ -461,9 +468,14 @@ public class HadoopJobExecHelper {
       return rj;
     }
 
-    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
+    Context getContext() {
+      return ctx;
+    }
+
+    public ExecDriverTaskHandle(JobClient jc, RunningJob rj, Context ctx) {
       this.jc = jc;
       this.rj = rj;
+      this.ctx = ctx;
     }
 
     public void setRunningJob(RunningJob job) {
@@ -517,7 +529,7 @@ public class HadoopJobExecHelper {
   }
 
 
-  public int progress(RunningJob rj, JobClient jc) throws IOException {
+  public int progress(RunningJob rj, JobClient jc, Context ctx) throws IOException, LockException {
     jobId = rj.getID();
 
     int returnVal = 0;
@@ -538,7 +550,7 @@ public class HadoopJobExecHelper {
 
     runningJobs.add(rj);
 
-    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
+    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, ctx);
     jobInfo(rj);
     MapRedStats mapRedStats = progress(th);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index b22991c..838f320 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -38,6 +38,7 @@ import java.util.TreeSet;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -217,7 +218,7 @@ public class TezJobMonitor {
    * @return int 0 - success, 1 - killed, 2 - failed
    */
   public int monitorExecution(final DAGClient dagClient, HiveConf conf,
-      DAG dag) throws InterruptedException {
+      DAG dag, Context ctx) throws InterruptedException {
     long monitorStartTime = System.currentTimeMillis();
     DAGStatus status = null;
     completed = new HashSet<String>();
@@ -247,6 +248,10 @@ public class TezJobMonitor {
     while (true) {
 
       try {
+        if (ctx != null) {
+          ctx.checkHeartbeaterLockException();
+        }
+
         status = dagClient.getDAGStatus(opts, checkInterval);
         progressMap = status.getVertexProgress();
         DAGStatus.State state = status.getState();

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index f059aea..9e114c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -187,7 +187,7 @@ public class TezTask extends Task<TezWork> {
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
-      rc = monitor.monitorExecution(dagClient, conf, dag);
+      rc = monitor.monitorExecution(dagClient, conf, dag, ctx);
       if (rc != 0) {
         this.setException(new HiveException(monitor.getDiagnostics()));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 6b0343b..0fedd48 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -151,7 +151,7 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, ctx);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index d31510d..6771b3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -223,7 +223,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, ctx);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 8acd6e0..ffc6311 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -187,7 +187,7 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc);
+      returnVal = jobExecHelper.progress(rj, jc, ctx);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 8c3a1d2..4539e71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -77,6 +77,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    */
   private int statementId = -1;
 
+  // QueryId for the query in current transaction
+  private String queryId;
+
   // ExecutorService for sending heartbeat to metastore periodically.
   private static ScheduledExecutorService heartbeatExecutorService = null;
   private ScheduledFuture<?> heartbeatTask = null;
@@ -136,8 +139,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
     try {
-      acquireLocks(plan, ctx, username, true);
-      startHeartbeat();
+      acquireLocksWithHeartbeatDelay(plan, ctx, username, 0);
     }
     catch(LockException e) {
       if(e.getCause() instanceof TxnAbortedException) {
@@ -307,7 +309,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   @VisibleForTesting
   void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
     acquireLocks(plan, ctx, username, true);
-    startHeartbeat(delay);
+    ctx.setHeartbeater(startHeartbeat(delay));
+    queryId = plan.getQueryId();
   }
 
 
@@ -414,28 +417,51 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
-  private void startHeartbeat() throws LockException {
-    startHeartbeat(0);
+  private Heartbeater startHeartbeat() throws LockException {
+    return startHeartbeat(0);
   }
 
   /**
    *  This is for testing only.  Normally client should call {@link #startHeartbeat()}
    *  Make the heartbeater start before an initial delay period.
    *  @param delay time to delay before first execution, in milliseconds
+   *  @return heartbeater
    */
-  void startHeartbeat(long delay) throws LockException {
+  Heartbeater startHeartbeat(long delay) throws LockException {
     long heartbeatInterval = getHeartbeatInterval(conf);
     assert heartbeatInterval > 0;
+    Heartbeater heartbeater = new Heartbeater(this, conf);
     heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
-        new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS);
-    LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " +
-        0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS);
+        heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS);
+    LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " +
+        TimeUnit.MILLISECONDS + " for query: " + queryId);
+    return heartbeater;
   }
 
-  private void stopHeartbeat() {
-    if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
-      heartbeatTask.cancel(false);
+  private void stopHeartbeat() throws LockException {
+    if (heartbeatTask != null) {
+      heartbeatTask.cancel(true);
+      long startTime = System.currentTimeMillis();
+      long sleepInterval = 100;
+      while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+        // We will wait for 30 seconds for the task to be cancelled.
+        // If it's still not cancelled (unlikely), we will just move on.
+        long now = System.currentTimeMillis();
+        if (now - startTime > 30000) {
+          LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId);
+          break;
+        }
+        try {
+          Thread.sleep(sleepInterval);
+        } catch (InterruptedException e) {
+        }
+        sleepInterval *= 2;
+      }
+      if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) {
+        LOG.info("Stopped heartbeat for query: " + queryId);
+      }
       heartbeatTask = null;
+      queryId = null;
     }
   }
 
@@ -553,13 +579,21 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    */
   public static class Heartbeater implements Runnable {
     private HiveTxnManager txnMgr;
+    private HiveConf conf;
+
+    LockException lockException;
+    public LockException getLockException() {
+      return lockException;
+    }
 
     /**
      *
      * @param txnMgr transaction manager for this operation
      */
-    public Heartbeater(HiveTxnManager txnMgr) {
+    public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
       this.txnMgr = txnMgr;
+      this.conf = conf;
+      lockException = null;
     }
 
     /**
@@ -568,10 +602,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     @Override
     public void run() {
       try {
+        // For negative testing purpose..
+        if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
+          throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
+        }
+
         LOG.debug("Heartbeating...");
         txnMgr.heartbeat();
       } catch (LockException e) {
         LOG.error("Failed trying to heartbeat " + e.getMessage());
+        lockException = e;
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 472da0b..903337d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -51,6 +51,7 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -746,6 +747,27 @@ public class TestTxnCommands2 {
     Assert.assertEquals("", expected,
       runStatementOnDriver("select a,b from " + tblName + " order by a"));
   }
+
+  /**
+   * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found.
+   * When a heartbeat fails, the query should be failed too.
+   * @throws Exception
+   */
+  @Test
+  public void testFailHeartbeater() throws Exception {
+    // Fail heartbeater, so that we can get a RuntimeException from the query.
+    // More specifically, it's the original IOException thrown by either MR's or Tez's progress monitoring loop.
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
+    Exception exception = null;
+    try {
+      runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
+    } catch (RuntimeException e) {
+      exception = e;
+    }
+    Assert.assertNotNull(exception);
+    Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true"));
+  }
+
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
index b65f94e..f844ee4 100644
--- a/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
+++ b/ql/src/test/results/clientnegative/index_compact_entry_limit.q.out
@@ -34,4 +34,4 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 #### A masked pattern was here ####
 Job Submission failed with exception 'java.io.IOException(org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries)'
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries

http://git-wip-us.apache.org/repos/asf/hive/blob/66a02116/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
index 299cc47..9ff8f8f 100644
--- a/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
+++ b/ql/src/test/results/clientnegative/index_compact_size_limit.q.out
@@ -34,4 +34,4 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@src
 #### A masked pattern was here ####
 Job Submission failed with exception 'java.io.IOException(Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size)'
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
+FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size