You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/01/16 21:31:40 UTC

hive git commit: HIVE-12366 Refactor Heartbeater logic for transaction (Wei Zheng via Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master f3ea7773b -> aa0f8e062


HIVE-12366 Refactor Heartbeater logic for transaction (Wei Zheng via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa0f8e06
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa0f8e06
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa0f8e06

Branch: refs/heads/master
Commit: aa0f8e062827245b05c353d02537e51b9957bf36
Parents: f3ea777
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Jan 16 12:31:29 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Jan 16 12:31:29 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   4 +-
 .../apache/hadoop/hive/ql/exec/Heartbeater.java |  90 ------------
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   2 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |  16 +--
 .../hadoop/hive/ql/exec/tez/TezJobMonitor.java  |  22 +--
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |   2 +-
 .../hadoop/hive/ql/io/merge/MergeFileTask.java  |   2 +-
 .../ql/io/rcfile/stats/PartialScanTask.java     |   2 +-
 .../io/rcfile/truncate/ColumnTruncateTask.java  |   2 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 140 +++++++++++++++++++
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   9 ++
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |  10 ++
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |   2 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  91 +++++++++---
 15 files changed, 244 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1e8c34b..2c25cae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -173,6 +173,7 @@ public class HiveConf extends Configuration {
       HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_MANAGER,
       HiveConf.ConfVars.HIVE_TXN_TIMEOUT,
+      HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE,
       HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH,
       HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION,
       HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED,
@@ -1508,6 +1509,8 @@ public class HiveConf extends Configuration {
         "no transactions."),
     HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS),
         "time after which transactions are declared aborted if the client has not sent a heartbeat."),
+    HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE("hive.txn.heartbeat.threadpool.size", 5, "The number of " +
+        "threads to use for heartbeating. For Hive CLI, 1 is enough. For HiveServer2, we need a few"),
     TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false,
       "Set this to true so that when attempt to acquire a lock on resource times out, the current state" +
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 9bff08f..020f037 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1026,9 +1026,7 @@ public class Driver implements CommandProcessor {
       if (ctx != null && ctx.getHiveLocks() != null) {
         hiveLocks.addAll(ctx.getHiveLocks());
       }
-      if (!hiveLocks.isEmpty()) {
-        txnMgr.getLockManager().releaseLocks(hiveLocks);
-      }
+      txnMgr.releaseLocks(hiveLocks);
     }
     hiveLocks.clear();
     if (ctx != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
deleted file mode 100644
index ff64563..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Class to handle heartbeats for MR and Tez tasks.
- */
-public class Heartbeater {
-  private long lastHeartbeat = 0;
-  private long heartbeatInterval = 0;
-  private boolean dontHeartbeat = false;
-  private HiveTxnManager txnMgr;
-  private Configuration conf;
-
-  static final private Logger LOG = LoggerFactory.getLogger(Heartbeater.class.getName());
-
-  /**
-   *
-   * @param txnMgr transaction manager for this operation
-   * @param conf Configuration for this operation
-   */
-  public Heartbeater(HiveTxnManager txnMgr, Configuration conf) {
-    this.txnMgr = txnMgr;
-    this.conf = conf;
-  }
-
-  /**
-   * Send a heartbeat to the metastore for locks and transactions.
-   * @throws IOException
-   */
-  public void heartbeat() throws IOException {
-    if (dontHeartbeat) return;
-
-    if (txnMgr == null) {
-      LOG.debug("txnMgr null, not heartbeating");
-      dontHeartbeat = true;
-      return;
-    }
-
-    if (heartbeatInterval == 0) {
-      // Multiply the heartbeat interval by 1000 to convert to milliseconds,
-      // but divide by 2 to give us a safety factor.
-      heartbeatInterval = HiveConf.getTimeVar(
-          conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
-      if (heartbeatInterval == 0) {
-        LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent");
-        dontHeartbeat = true;
-        LOG.debug("heartbeat interval 0, not heartbeating");
-        return;
-      }
-    }
-    long now = System.currentTimeMillis();
-    if (now - lastHeartbeat > heartbeatInterval) {
-      try {
-        LOG.debug("heartbeating");
-        txnMgr.heartbeat();
-      } catch (LockException e) {
-        LOG.warn("Failed trying to heartbeat " + e.getMessage());
-        throw new IOException(e);
-      }
-      lastHeartbeat = now;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index ab7fd93..472e8ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -429,7 +429,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager());
+      returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
     } catch (Exception e) {
       e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 5f35630..1b296b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -33,13 +33,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.Heartbeater;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskHandle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -230,14 +228,12 @@ public class HadoopJobExecHelper {
     int numReduce = -1;
     List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
     final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);
-    Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job);
 
     while (!rj.isComplete()) {
       try {
         Thread.sleep(pullInterval);
       } catch (InterruptedException e) {
       }
-      heartbeater.heartbeat();
 
       if (initializing && rj.getJobState() == JobStatus.PREP) {
         // No reason to poll untill the job is initialized
@@ -451,7 +447,6 @@ public class HadoopJobExecHelper {
   private static class ExecDriverTaskHandle extends TaskHandle {
     JobClient jc;
     RunningJob rj;
-    HiveTxnManager txnMgr;
 
     JobClient getJobClient() {
       return jc;
@@ -461,14 +456,9 @@ public class HadoopJobExecHelper {
       return rj;
     }
 
-    HiveTxnManager getTxnManager() {
-      return txnMgr;
-    }
-
-    public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) {
+    public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
       this.jc = jc;
       this.rj = rj;
-      this.txnMgr = txnMgr;
     }
 
     public void setRunningJob(RunningJob job) {
@@ -522,7 +512,7 @@ public class HadoopJobExecHelper {
   }
 
 
-  public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException {
+  public int progress(RunningJob rj, JobClient jc) throws IOException {
     jobId = rj.getID();
 
     int returnVal = 0;
@@ -543,7 +533,7 @@ public class HadoopJobExecHelper {
 
     runningJobs.add(rj);
 
-    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr);
+    ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
     jobInfo(rj);
     MapRedStats mapRedStats = progress(th);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index e81b73d..479bc93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -39,11 +39,9 @@ import java.util.TreeSet;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Heartbeater;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -63,21 +61,6 @@ import org.fusesource.jansi.Ansi;
 
 import com.google.common.base.Preconditions;
 
-import java.io.IOException;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
 /**
  * TezJobMonitor keeps track of a tez job while it's being executed. It will
  * print status to the console and retrieve final status of the job after
@@ -220,11 +203,10 @@ public class TezJobMonitor {
    * monitorExecution handles status printing, failures during execution and final status retrieval.
    *
    * @param dagClient client that was used to kick off the job
-   * @param txnMgr transaction manager for this operation
    * @param conf configuration file for this operation
    * @return int 0 - success, 1 - killed, 2 - failed
    */
-  public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf,
+  public int monitorExecution(final DAGClient dagClient, HiveConf conf,
       DAG dag) throws InterruptedException {
     long monitorStartTime = System.currentTimeMillis();
     DAGStatus status = null;
@@ -238,7 +220,6 @@ public class TezJobMonitor {
     DAGStatus.State lastState = null;
     String lastReport = null;
     Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
-    Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
     long startTime = 0;
     boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
       Utilities.isPerfOrAboveLogging(conf);
@@ -255,7 +236,6 @@ public class TezJobMonitor {
         status = dagClient.getDAGStatus(opts, checkInterval);
         Map<String, Progress> progressMap = status.getVertexProgress();
         DAGStatus.State state = status.getState();
-        heartbeater.heartbeat();
 
         if (state != lastState || state == RUNNING) {
           lastState = state;

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 88fba58..3cb7439 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -191,7 +191,7 @@ public class TezTask extends Task<TezWork> {
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
-      rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag);
+      rc = monitor.monitorExecution(dagClient, conf, dag);
       if (rc != 0) {
         this.setException(new HiveException(monitor.getDiagnostics()));
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 7453145..e23a969 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -149,7 +149,7 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc, null);
+      returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 188e9a6..829a9f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -221,7 +221,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc, null);
+      returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 08e3d80..34c067a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -185,7 +185,7 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
 
-      returnVal = jobExecHelper.progress(rj, jc, null);
+      returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 552367c..3617699 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.lockmgr;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
@@ -39,6 +42,13 @@ import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * An implementation of HiveTxnManager that stores the transactions in the
@@ -65,7 +75,25 @@ public class DbTxnManager extends HiveTxnManagerImpl {
    */
   private int statementId = -1;
 
+  // ExecutorService for sending heartbeat to metastore periodically.
+  private static ScheduledExecutorService heartbeatExecutorService = null;
+  private ScheduledFuture<?> heartbeatTask = null;
+  private Runnable shutdownRunner = null;
+  static final int SHUTDOWN_HOOK_PRIORITY = 0;
+
   DbTxnManager() {
+    shutdownRunner = new Runnable() {
+      @Override
+      public void run() {
+        if (heartbeatExecutorService != null
+            && !heartbeatExecutorService.isShutdown()
+            && !heartbeatExecutorService.isTerminated()) {
+          LOG.info("Shutting down Heartbeater thread pool.");
+          heartbeatExecutorService.shutdown();
+        }
+      }
+    };
+    ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
   }
 
   @Override
@@ -104,6 +132,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
     acquireLocks(plan, ctx, username, true);
+    startHeartbeat();
   }
 
   /**
@@ -245,6 +274,25 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     ctx.setHiveLocks(locks);
     return lockState;
   }
+  /**
+   * This is for testing only.
+   * @param delay time to delay for first heartbeat
+   * @return null if no locks were needed
+   */
+  @VisibleForTesting
+  void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
+    acquireLocks(plan, ctx, username, true);
+    startHeartbeat(delay);
+  }
+
+
+  @Override
+  public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+    if (lockMgr != null) {
+      stopHeartbeat();
+      lockMgr.releaseLocks(hiveLocks);
+    }
+  }
 
   @Override
   public void commitTxn() throws LockException {
@@ -253,6 +301,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
     try {
       lockMgr.clearLocalLockRecords();
+      stopHeartbeat();
       LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId));
       client.commitTxn(txnId);
     } catch (NoSuchTxnException e) {
@@ -277,6 +326,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
     try {
       lockMgr.clearLocalLockRecords();
+      stopHeartbeat();
       LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
       client.rollbackTxn(txnId);
     } catch (NoSuchTxnException e) {
@@ -337,6 +387,31 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
+  private void startHeartbeat() throws LockException {
+    startHeartbeat(0);
+  }
+
+  /**
+   *  This is for testing only.  Normally client should call {@link #startHeartbeat()}
+   *  Make the heartbeater start before an initial delay period.
+   *  @param delay time to delay before first execution, in milliseconds
+   */
+  void startHeartbeat(long delay) throws LockException {
+    long heartbeatInterval = getHeartbeatInterval(conf);
+    assert heartbeatInterval > 0;
+    heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
+        new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS);
+    LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " +
+        0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS);
+  }
+
+  private void stopHeartbeat() {
+    if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+      heartbeatTask.cancel(true);
+      heartbeatTask = null;
+    }
+  }
+
   @Override
   public ValidTxnList getValidTxns() throws LockException {
     init();
@@ -366,6 +441,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   @Override
   protected void destruct() {
     try {
+      stopHeartbeat();
+      if (shutdownRunner != null) {
+        ShutdownHookManager.removeShutdownHook(shutdownRunner);
+      }
       if (isTxnOpen()) rollbackTxn();
       if (lockMgr != null) lockMgr.close();
     } catch (Exception e) {
@@ -384,6 +463,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       try {
         Hive db = Hive.get(conf);
         client = db.getMSC();
+        initHeartbeatExecutorService();
       } catch (MetaException e) {
         throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
       } catch (HiveException e) {
@@ -391,6 +471,26 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       }
     }
   }
+
+  private synchronized void initHeartbeatExecutorService() {
+    if (heartbeatExecutorService != null
+        && !heartbeatExecutorService.isShutdown()
+        && !heartbeatExecutorService.isTerminated()) {
+      return;
+    }
+
+    int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
+    heartbeatExecutorService =
+        Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() {
+          private final AtomicInteger threadCounter = new AtomicInteger();
+          @Override
+          public Thread newThread(Runnable r) {
+            return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement());
+          }
+        });
+    ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true);
+  }
+
   @Override
   public boolean isTxnOpen() {
     return txnId > 0;
@@ -403,4 +503,44 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   public int getStatementId() {
     return statementId;
   }
+
+  public static long getHeartbeatInterval(Configuration conf) throws LockException {
+    // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),
+    // then divide it by 2 to give us a safety factor.
+    long interval =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
+    if (interval == 0) {
+      throw new LockException(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set," +
+          " heartbeats won't be sent");
+    }
+    return interval;
+  }
+
+  /**
+   * Heartbeater thread
+   */
+  public static class Heartbeater implements Runnable {
+    private HiveTxnManager txnMgr;
+
+    /**
+     *
+     * @param txnMgr transaction manager for this operation
+     */
+    public Heartbeater(HiveTxnManager txnMgr) {
+      this.txnMgr = txnMgr;
+    }
+
+    /**
+     * Send a heartbeat to the metastore for locks and transactions.
+     */
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Heartbeating...");
+        txnMgr.heartbeat();
+      } catch (LockException e) {
+        LOG.error("Failed trying to heartbeat " + e.getMessage());
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 2d30198..036fc24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -181,6 +181,15 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+    // If there's no lock manager, it essentially means we didn't acquire locks in the first place,
+    // thus no need to release locks
+    if (lockMgr != null) {
+      lockMgr.releaseLocks(hiveLocks);
+    }
+  }
+
+  @Override
   public void commitTxn() throws LockException {
     // No-op
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 2bfc732..cb97d29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.plan.LockTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 
+import java.util.List;
+
 /**
  * An interface that allows Hive to manage transactions.  All classes
  * implementing this should extend {@link HiveTxnManagerImpl} rather than
@@ -67,6 +69,14 @@ public interface HiveTxnManager {
   void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException;
 
   /**
+   * Release specified locks.
+   * Transaction aware TxnManagers, which has {@code supportsAcid() == true},
+   * will track locks internally and ignore this parameter
+   * @param hiveLocks The list of locks to be released.
+   */
+  void releaseLocks(List<HiveLock> hiveLocks) throws LockException;
+
+  /**
    * Commit the current transaction.  This will release all locks obtained in
    * {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan,
    * org.apache.hadoop.hive.ql.Context, java.lang.String)}.

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 68190c2..b20ce28 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -414,7 +414,7 @@ public class TestTxnCommands {
     runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
     //make sure currently running txn is considered aborted by housekeeper
     hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
-    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
     AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
     //this will abort the txn
     houseKeeperService.start(hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index f82b85a..88b379c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -33,9 +33,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
-import org.apache.log4j.Level;
-import org.slf4j.LoggerFactory;
-import static org.hamcrest.CoreMatchers.is;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -202,6 +199,7 @@ public class TestDbTxnManager {
     addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
     txnMgr.openTxn("NicholasII");
+    Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     LockException exception = null;
     try {
@@ -212,8 +210,10 @@ public class TestDbTxnManager {
     }
     Assert.assertNotNull("Expected exception1", exception);
     Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
+
     exception = null;
     txnMgr.openTxn("AlexanderIII");
+    Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     try {
       txnMgr.rollbackTxn();
@@ -223,20 +223,6 @@ public class TestDbTxnManager {
     }
     Assert.assertNotNull("Expected exception2", exception);
     Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg());
-    exception = null;
-    txnMgr.openTxn("PeterI");
-    txnMgr.acquireLocks(qp, ctx, "PeterI");
-    List<HiveLock> locks = ctx.getHiveLocks();
-    Assert.assertThat("Unexpected lock count", locks.size(), is(1));
-    runReaper();
-    try {
-      txnMgr.heartbeat();
-    }
-    catch(LockException ex) {
-      exception = ex;
-    }
-    Assert.assertNotNull("Expected exception3", exception);
-    Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
   }
 
   @Test
@@ -247,12 +233,12 @@ public class TestDbTxnManager {
     expireLocks(txnMgr, 0);
     //create a few read locks, all on the same resource
     for(int i = 0; i < 5; i++) {
-      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+      ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
     }
     expireLocks(txnMgr, 5);
     //create a lot of locks
     for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
-      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+      ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
     }
     expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
   }
@@ -260,6 +246,7 @@ public class TestDbTxnManager {
     DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
     ShowLocksResponse resp = lockManager.getLocks();
     Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
+    Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     resp = lockManager.getLocks();
     Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
@@ -381,6 +368,70 @@ public class TestDbTxnManager {
     Assert.assertTrue(sawException);
   }
 
+  @Test
+  public void testLockAcquisitionAndRelease() throws Exception {
+    addTableInput();
+    QueryPlan qp = new MockQueryPlan(this);
+    txnMgr.acquireLocks(qp, ctx, "fred");
+    List<HiveLock> locks = ctx.getHiveLocks();
+    Assert.assertEquals(1, locks.size());
+    txnMgr.releaseLocks(locks);
+    locks = txnMgr.getLockManager().getLocks(false, false);
+    Assert.assertEquals(0, locks.size());
+  }
+
+  @Test
+  public void testHeartbeater() throws Exception {
+    Assert.assertTrue(txnMgr instanceof DbTxnManager);
+
+    addTableInput();
+    LockException exception = null;
+    QueryPlan qp = new MockQueryPlan(this);
+
+    // Case 1: If there's no delay for the heartbeat, txn should be able to commit
+    txnMgr.openTxn("fred");
+    txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started..
+    runReaper();
+    try {
+      txnMgr.commitTxn();
+    } catch (LockException e) {
+      exception = e;
+    }
+    Assert.assertNull("Txn commit should be successful", exception);
+    exception = null;
+
+    // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
+    //         then txt should be able to commit
+    txnMgr.openTxn("tom");
+    // Start the heartbeat after a delay, which is shorter than  the HIVE_TXN_TIMEOUT
+    ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom",
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
+    runReaper();
+    try {
+      txnMgr.commitTxn();
+    } catch (LockException e) {
+      exception = e;
+    }
+    Assert.assertNull("Txn commit should also be successful", exception);
+    exception = null;
+
+    // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper,
+    //         then the txn will time out and be aborted.
+    //         Here we just don't send the heartbeat at all - an infinite delay.
+    txnMgr.openTxn("jerry");
+    // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
+    ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true);
+    Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    runReaper();
+    try {
+      txnMgr.commitTxn();
+    } catch (LockException e) {
+      exception = e;
+    }
+    Assert.assertNotNull("Txn should have been aborted", exception);
+    Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
+  }
+
   @Before
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();
@@ -391,7 +442,7 @@ public class TestDbTxnManager {
     readEntities = new HashSet<ReadEntity>();
     writeEntities = new HashSet<WriteEntity>();
     conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
-    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
     houseKeeperService = new AcidHouseKeeperService();
   }