You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/01/16 21:31:40 UTC
hive git commit: HIVE-12366 Refactor Heartbeater logic for
transaction (Wei Zheng via Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master f3ea7773b -> aa0f8e062
HIVE-12366 Refactor Heartbeater logic for transaction (Wei Zheng via Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa0f8e06
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa0f8e06
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa0f8e06
Branch: refs/heads/master
Commit: aa0f8e062827245b05c353d02537e51b9957bf36
Parents: f3ea777
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Jan 16 12:31:29 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Jan 16 12:31:29 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 4 +-
.../apache/hadoop/hive/ql/exec/Heartbeater.java | 90 ------------
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 16 +--
.../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 22 +--
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 2 +-
.../hadoop/hive/ql/io/merge/MergeFileTask.java | 2 +-
.../ql/io/rcfile/stats/PartialScanTask.java | 2 +-
.../io/rcfile/truncate/ColumnTruncateTask.java | 2 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 140 +++++++++++++++++++
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 9 ++
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 10 ++
.../apache/hadoop/hive/ql/TestTxnCommands.java | 2 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 91 +++++++++---
15 files changed, 244 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1e8c34b..2c25cae 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -173,6 +173,7 @@ public class HiveConf extends Configuration {
HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
HiveConf.ConfVars.HIVE_TXN_MANAGER,
HiveConf.ConfVars.HIVE_TXN_TIMEOUT,
+ HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE,
HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH,
HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION,
HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED,
@@ -1508,6 +1509,8 @@ public class HiveConf extends Configuration {
"no transactions."),
HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS),
"time after which transactions are declared aborted if the client has not sent a heartbeat."),
+ HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE("hive.txn.heartbeat.threadpool.size", 5, "The number of " +
+ "threads to use for heartbeating. For Hive CLI, 1 is enough. For HiveServer2, we need a few"),
TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false,
"Set this to true so that when attempt to acquire a lock on resource times out, the current state" +
" of the lock manager is dumped to log file. This is for debugging. See also " +
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 9bff08f..020f037 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1026,9 +1026,7 @@ public class Driver implements CommandProcessor {
if (ctx != null && ctx.getHiveLocks() != null) {
hiveLocks.addAll(ctx.getHiveLocks());
}
- if (!hiveLocks.isEmpty()) {
- txnMgr.getLockManager().releaseLocks(hiveLocks);
- }
+ txnMgr.releaseLocks(hiveLocks);
}
hiveLocks.clear();
if (ctx != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
deleted file mode 100644
index ff64563..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Class to handle heartbeats for MR and Tez tasks.
- */
-public class Heartbeater {
- private long lastHeartbeat = 0;
- private long heartbeatInterval = 0;
- private boolean dontHeartbeat = false;
- private HiveTxnManager txnMgr;
- private Configuration conf;
-
- static final private Logger LOG = LoggerFactory.getLogger(Heartbeater.class.getName());
-
- /**
- *
- * @param txnMgr transaction manager for this operation
- * @param conf Configuration for this operation
- */
- public Heartbeater(HiveTxnManager txnMgr, Configuration conf) {
- this.txnMgr = txnMgr;
- this.conf = conf;
- }
-
- /**
- * Send a heartbeat to the metastore for locks and transactions.
- * @throws IOException
- */
- public void heartbeat() throws IOException {
- if (dontHeartbeat) return;
-
- if (txnMgr == null) {
- LOG.debug("txnMgr null, not heartbeating");
- dontHeartbeat = true;
- return;
- }
-
- if (heartbeatInterval == 0) {
- // Multiply the heartbeat interval by 1000 to convert to milliseconds,
- // but divide by 2 to give us a safety factor.
- heartbeatInterval = HiveConf.getTimeVar(
- conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
- if (heartbeatInterval == 0) {
- LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent");
- dontHeartbeat = true;
- LOG.debug("heartbeat interval 0, not heartbeating");
- return;
- }
- }
- long now = System.currentTimeMillis();
- if (now - lastHeartbeat > heartbeatInterval) {
- try {
- LOG.debug("heartbeating");
- txnMgr.heartbeat();
- } catch (LockException e) {
- LOG.warn("Failed trying to heartbeat " + e.getMessage());
- throw new IOException(e);
- }
- lastHeartbeat = now;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index ab7fd93..472e8ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -429,7 +429,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
- returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager());
+ returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 5f35630..1b296b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -33,13 +33,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.MapRedStats;
-import org.apache.hadoop.hive.ql.exec.Heartbeater;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -230,14 +228,12 @@ public class HadoopJobExecHelper {
int numReduce = -1;
List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);
- Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job);
while (!rj.isComplete()) {
try {
Thread.sleep(pullInterval);
} catch (InterruptedException e) {
}
- heartbeater.heartbeat();
if (initializing && rj.getJobState() == JobStatus.PREP) {
// No reason to poll untill the job is initialized
@@ -451,7 +447,6 @@ public class HadoopJobExecHelper {
private static class ExecDriverTaskHandle extends TaskHandle {
JobClient jc;
RunningJob rj;
- HiveTxnManager txnMgr;
JobClient getJobClient() {
return jc;
@@ -461,14 +456,9 @@ public class HadoopJobExecHelper {
return rj;
}
- HiveTxnManager getTxnManager() {
- return txnMgr;
- }
-
- public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) {
+ public ExecDriverTaskHandle(JobClient jc, RunningJob rj) {
this.jc = jc;
this.rj = rj;
- this.txnMgr = txnMgr;
}
public void setRunningJob(RunningJob job) {
@@ -522,7 +512,7 @@ public class HadoopJobExecHelper {
}
- public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException {
+ public int progress(RunningJob rj, JobClient jc) throws IOException {
jobId = rj.getID();
int returnVal = 0;
@@ -543,7 +533,7 @@ public class HadoopJobExecHelper {
runningJobs.add(rj);
- ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr);
+ ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
MapRedStats mapRedStats = progress(th);
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index e81b73d..479bc93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -39,11 +39,9 @@ import java.util.TreeSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Heartbeater;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -63,21 +61,6 @@ import org.fusesource.jansi.Ansi;
import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
/**
* TezJobMonitor keeps track of a tez job while it's being executed. It will
* print status to the console and retrieve final status of the job after
@@ -220,11 +203,10 @@ public class TezJobMonitor {
* monitorExecution handles status printing, failures during execution and final status retrieval.
*
* @param dagClient client that was used to kick off the job
- * @param txnMgr transaction manager for this operation
* @param conf configuration file for this operation
* @return int 0 - success, 1 - killed, 2 - failed
*/
- public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf,
+ public int monitorExecution(final DAGClient dagClient, HiveConf conf,
DAG dag) throws InterruptedException {
long monitorStartTime = System.currentTimeMillis();
DAGStatus status = null;
@@ -238,7 +220,6 @@ public class TezJobMonitor {
DAGStatus.State lastState = null;
String lastReport = null;
Set<StatusGetOpts> opts = new HashSet<StatusGetOpts>();
- Heartbeater heartbeater = new Heartbeater(txnMgr, conf);
long startTime = 0;
boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf);
@@ -255,7 +236,6 @@ public class TezJobMonitor {
status = dagClient.getDAGStatus(opts, checkInterval);
Map<String, Progress> progressMap = status.getVertexProgress();
DAGStatus.State state = status.getState();
- heartbeater.heartbeat();
if (state != lastState || state == RUNNING) {
lastState = state;
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 88fba58..3cb7439 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -191,7 +191,7 @@ public class TezTask extends Task<TezWork> {
// finally monitor will print progress until the job is done
TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap());
- rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag);
+ rc = monitor.monitorExecution(dagClient, conf, dag);
if (rc != 0) {
this.setException(new HiveException(monitor.getDiagnostics()));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 7453145..e23a969 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -149,7 +149,7 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
- returnVal = jobExecHelper.progress(rj, jc, null);
+ returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index 188e9a6..829a9f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -221,7 +221,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
- returnVal = jobExecHelper.progress(rj, jc, null);
+ returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 08e3d80..34c067a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -185,7 +185,7 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
- returnVal = jobExecHelper.progress(rj, jc, null);
+ returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 552367c..3617699 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.lockmgr;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.JavaUtils;
@@ -39,6 +42,13 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of HiveTxnManager that stores the transactions in the
@@ -65,7 +75,25 @@ public class DbTxnManager extends HiveTxnManagerImpl {
*/
private int statementId = -1;
+ // ExecutorService for sending heartbeat to metastore periodically.
+ private static ScheduledExecutorService heartbeatExecutorService = null;
+ private ScheduledFuture<?> heartbeatTask = null;
+ private Runnable shutdownRunner = null;
+ static final int SHUTDOWN_HOOK_PRIORITY = 0;
+
DbTxnManager() {
+ shutdownRunner = new Runnable() {
+ @Override
+ public void run() {
+ if (heartbeatExecutorService != null
+ && !heartbeatExecutorService.isShutdown()
+ && !heartbeatExecutorService.isTerminated()) {
+ LOG.info("Shutting down Heartbeater thread pool.");
+ heartbeatExecutorService.shutdown();
+ }
+ }
+ };
+ ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
}
@Override
@@ -104,6 +132,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
acquireLocks(plan, ctx, username, true);
+ startHeartbeat();
}
/**
@@ -245,6 +274,25 @@ public class DbTxnManager extends HiveTxnManagerImpl {
ctx.setHiveLocks(locks);
return lockState;
}
+ /**
+ * This is for testing only.
+ * @param delay time to delay for first heartbeat
+ * @return null if no locks were needed
+ */
+ @VisibleForTesting
+ void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
+ acquireLocks(plan, ctx, username, true);
+ startHeartbeat(delay);
+ }
+
+
+ @Override
+ public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+ if (lockMgr != null) {
+ stopHeartbeat();
+ lockMgr.releaseLocks(hiveLocks);
+ }
+ }
@Override
public void commitTxn() throws LockException {
@@ -253,6 +301,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
try {
lockMgr.clearLocalLockRecords();
+ stopHeartbeat();
LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId));
client.commitTxn(txnId);
} catch (NoSuchTxnException e) {
@@ -277,6 +326,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
try {
lockMgr.clearLocalLockRecords();
+ stopHeartbeat();
LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId));
client.rollbackTxn(txnId);
} catch (NoSuchTxnException e) {
@@ -337,6 +387,31 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
}
+ private void startHeartbeat() throws LockException {
+ startHeartbeat(0);
+ }
+
+ /**
+ * This is for testing only. Normally client should call {@link #startHeartbeat()}
+ * Make the heartbeater start before an initial delay period.
+ * @param delay time to delay before first execution, in milliseconds
+ */
+ void startHeartbeat(long delay) throws LockException {
+ long heartbeatInterval = getHeartbeatInterval(conf);
+ assert heartbeatInterval > 0;
+ heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
+ new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS);
+ LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " +
+ 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS);
+ }
+
+ private void stopHeartbeat() {
+ if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) {
+ heartbeatTask.cancel(true);
+ heartbeatTask = null;
+ }
+ }
+
@Override
public ValidTxnList getValidTxns() throws LockException {
init();
@@ -366,6 +441,10 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
protected void destruct() {
try {
+ stopHeartbeat();
+ if (shutdownRunner != null) {
+ ShutdownHookManager.removeShutdownHook(shutdownRunner);
+ }
if (isTxnOpen()) rollbackTxn();
if (lockMgr != null) lockMgr.close();
} catch (Exception e) {
@@ -384,6 +463,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
try {
Hive db = Hive.get(conf);
client = db.getMSC();
+ initHeartbeatExecutorService();
} catch (MetaException e) {
throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
} catch (HiveException e) {
@@ -391,6 +471,26 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
}
}
+
+ private synchronized void initHeartbeatExecutorService() {
+ if (heartbeatExecutorService != null
+ && !heartbeatExecutorService.isShutdown()
+ && !heartbeatExecutorService.isTerminated()) {
+ return;
+ }
+
+ int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
+ heartbeatExecutorService =
+ Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() {
+ private final AtomicInteger threadCounter = new AtomicInteger();
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement());
+ }
+ });
+ ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true);
+ }
+
@Override
public boolean isTxnOpen() {
return txnId > 0;
@@ -403,4 +503,44 @@ public class DbTxnManager extends HiveTxnManagerImpl {
public int getStatementId() {
return statementId;
}
+
+ public static long getHeartbeatInterval(Configuration conf) throws LockException {
+ // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),
+ // then divide it by 2 to give us a safety factor.
+ long interval =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
+ if (interval == 0) {
+ throw new LockException(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set," +
+ " heartbeats won't be sent");
+ }
+ return interval;
+ }
+
+ /**
+ * Heartbeater thread
+ */
+ public static class Heartbeater implements Runnable {
+ private HiveTxnManager txnMgr;
+
+ /**
+ *
+ * @param txnMgr transaction manager for this operation
+ */
+ public Heartbeater(HiveTxnManager txnMgr) {
+ this.txnMgr = txnMgr;
+ }
+
+ /**
+ * Send a heartbeat to the metastore for locks and transactions.
+ */
+ @Override
+ public void run() {
+ try {
+ LOG.debug("Heartbeating...");
+ txnMgr.heartbeat();
+ } catch (LockException e) {
+ LOG.error("Failed trying to heartbeat " + e.getMessage());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 2d30198..036fc24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -181,6 +181,15 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
+ public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+ // If there's no lock manager, it essentially means we didn't acquire locks in the first place,
+ // thus no need to release locks
+ if (lockMgr != null) {
+ lockMgr.releaseLocks(hiveLocks);
+ }
+ }
+
+ @Override
public void commitTxn() throws LockException {
// No-op
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 2bfc732..cb97d29 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import java.util.List;
+
/**
* An interface that allows Hive to manage transactions. All classes
* implementing this should extend {@link HiveTxnManagerImpl} rather than
@@ -67,6 +69,14 @@ public interface HiveTxnManager {
void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException;
/**
+ * Release specified locks.
+ * Transaction aware TxnManagers, which has {@code supportsAcid() == true},
+ * will track locks internally and ignore this parameter
+ * @param hiveLocks The list of locks to be released.
+ */
+ void releaseLocks(List<HiveLock> hiveLocks) throws LockException;
+
+ /**
* Commit the current transaction. This will release all locks obtained in
* {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan,
* org.apache.hadoop.hive.ql.Context, java.lang.String)}.
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 68190c2..b20ce28 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -414,7 +414,7 @@ public class TestTxnCommands {
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
//make sure currently running txn is considered aborted by housekeeper
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
- hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+ hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS);
AcidHouseKeeperService houseKeeperService = new AcidHouseKeeperService();
//this will abort the txn
houseKeeperService.start(hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/aa0f8e06/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index f82b85a..88b379c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -33,9 +33,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
-import org.apache.log4j.Level;
-import org.slf4j.LoggerFactory;
-import static org.hamcrest.CoreMatchers.is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -202,6 +199,7 @@ public class TestDbTxnManager {
addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
txnMgr.openTxn("NicholasII");
+ Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
LockException exception = null;
try {
@@ -212,8 +210,10 @@ public class TestDbTxnManager {
}
Assert.assertNotNull("Expected exception1", exception);
Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
+
exception = null;
txnMgr.openTxn("AlexanderIII");
+ Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
try {
txnMgr.rollbackTxn();
@@ -223,20 +223,6 @@ public class TestDbTxnManager {
}
Assert.assertNotNull("Expected exception2", exception);
Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg());
- exception = null;
- txnMgr.openTxn("PeterI");
- txnMgr.acquireLocks(qp, ctx, "PeterI");
- List<HiveLock> locks = ctx.getHiveLocks();
- Assert.assertThat("Unexpected lock count", locks.size(), is(1));
- runReaper();
- try {
- txnMgr.heartbeat();
- }
- catch(LockException ex) {
- exception = ex;
- }
- Assert.assertNotNull("Expected exception3", exception);
- Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
}
@Test
@@ -247,12 +233,12 @@ public class TestDbTxnManager {
expireLocks(txnMgr, 0);
//create a few read locks, all on the same resource
for(int i = 0; i < 5; i++) {
- txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+ ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
expireLocks(txnMgr, 5);
//create a lot of locks
for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
- txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+ ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
}
@@ -260,6 +246,7 @@ public class TestDbTxnManager {
DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
ShowLocksResponse resp = lockManager.getLocks();
Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
+ Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
resp = lockManager.getLocks();
Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
@@ -381,6 +368,70 @@ public class TestDbTxnManager {
Assert.assertTrue(sawException);
}
+ @Test
+ public void testLockAcquisitionAndRelease() throws Exception {
+ addTableInput();
+ QueryPlan qp = new MockQueryPlan(this);
+ txnMgr.acquireLocks(qp, ctx, "fred");
+ List<HiveLock> locks = ctx.getHiveLocks();
+ Assert.assertEquals(1, locks.size());
+ txnMgr.releaseLocks(locks);
+ locks = txnMgr.getLockManager().getLocks(false, false);
+ Assert.assertEquals(0, locks.size());
+ }
+
+ @Test
+ public void testHeartbeater() throws Exception {
+ Assert.assertTrue(txnMgr instanceof DbTxnManager);
+
+ addTableInput();
+ LockException exception = null;
+ QueryPlan qp = new MockQueryPlan(this);
+
+ // Case 1: If there's no delay for the heartbeat, txn should be able to commit
+ txnMgr.openTxn("fred");
+ txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started..
+ runReaper();
+ try {
+ txnMgr.commitTxn();
+ } catch (LockException e) {
+ exception = e;
+ }
+ Assert.assertNull("Txn commit should be successful", exception);
+ exception = null;
+
+ // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
+ // then txt should be able to commit
+ txnMgr.openTxn("tom");
+ // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT
+ ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom",
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
+ runReaper();
+ try {
+ txnMgr.commitTxn();
+ } catch (LockException e) {
+ exception = e;
+ }
+ Assert.assertNull("Txn commit should also be successful", exception);
+ exception = null;
+
+ // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper,
+ // then the txn will time out and be aborted.
+ // Here we just don't send the heartbeat at all - an infinite delay.
+ txnMgr.openTxn("jerry");
+ // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
+ ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true);
+ Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
+ runReaper();
+ try {
+ txnMgr.commitTxn();
+ } catch (LockException e) {
+ exception = e;
+ }
+ Assert.assertNotNull("Txn should have been aborted", exception);
+ Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
+ }
+
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();
@@ -391,7 +442,7 @@ public class TestDbTxnManager {
readEntities = new HashSet<ReadEntity>();
writeEntities = new HashSet<WriteEntity>();
conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS);
- conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS);
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
houseKeeperService = new AcidHouseKeeperService();
}