You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/20 16:50:58 UTC

[1/3] hive git commit: HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master 360dfa0ff -> 259e8be1d


http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
new file mode 100644
index 0000000..2804e21
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -0,0 +1,1484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+/**
+ * Tests for TxnHandler.
+ */
+public class TestTxnHandler {
+  static final private String CLASS_NAME = TxnHandler.class.getName();
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  private HiveConf conf = new HiveConf();
+  private TxnStore txnHandler;
+
+  public TestTxnHandler() throws Exception {
+    TxnDbUtil.setConfValues(conf);
+    LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
+    Configuration conf = ctx.getConfiguration();
+    conf.getLoggerConfig(CLASS_NAME).setLevel(Level.DEBUG);
+    ctx.updateLoggers(conf);
+    tearDown();
+  }
+
+  @Test
+  public void testValidTxnsEmpty() throws Exception {
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(0L, txnsInfo.getTxn_high_water_mark());
+    assertTrue(txnsInfo.getOpen_txns().isEmpty());
+    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+    assertEquals(0L, txns.getTxn_high_water_mark());
+    assertTrue(txns.getOpen_txns().isEmpty());
+  }
+
+  @Test
+  public void testOpenTxn() throws Exception {
+    long first = openTxn();
+    assertEquals(1L, first);
+    long second = openTxn();
+    assertEquals(2L, second);
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+    assertEquals(2, txnsInfo.getOpen_txns().size());
+    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState());
+    assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
+    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+    assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser());
+    assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname());
+
+    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+    assertEquals(2L, txns.getTxn_high_water_mark());
+    assertEquals(2, txns.getOpen_txns().size());
+    boolean[] saw = new boolean[3];
+    for (int i = 0; i < saw.length; i++) saw[i] = false;
+    for (Long tid : txns.getOpen_txns()) {
+      saw[tid.intValue()] = true;
+    }
+    for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+  }
+
+  @Test
+  public void testAbortTxn() throws Exception {
+    OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
+    List<Long> txnList = openedTxns.getTxn_ids();
+    long first = txnList.get(0);
+    assertEquals(1L, first);
+    long second = txnList.get(1);
+    assertEquals(2L, second);
+    txnHandler.abortTxn(new AbortTxnRequest(1));
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+    assertEquals(2, txnsInfo.getOpen_txns().size());
+    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+    assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
+    assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
+    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+
+    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+    assertEquals(2L, txns.getTxn_high_water_mark());
+    assertEquals(2, txns.getOpen_txns().size());
+    boolean[] saw = new boolean[3];
+    for (int i = 0; i < saw.length; i++) saw[i] = false;
+    for (Long tid : txns.getOpen_txns()) {
+      saw[tid.intValue()] = true;
+    }
+    for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+  }
+
+  @Test
+  public void testAbortInvalidTxn() throws Exception {
+    boolean caught = false;
+    try {
+      txnHandler.abortTxn(new AbortTxnRequest(195L));
+    } catch (NoSuchTxnException e) {
+      caught = true;
+    }
+    assertTrue(caught);
+  }
+
+  @Test
+  public void testValidTxnsNoneOpen() throws Exception {
+    txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
+    txnHandler.commitTxn(new CommitTxnRequest(1));
+    txnHandler.commitTxn(new CommitTxnRequest(2));
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+    assertEquals(0, txnsInfo.getOpen_txns().size());
+    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+    assertEquals(2L, txns.getTxn_high_water_mark());
+    assertEquals(0, txns.getOpen_txns().size());
+  }
+
+  @Test
+  public void testValidTxnsSomeOpen() throws Exception {
+    txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
+    txnHandler.abortTxn(new AbortTxnRequest(1));
+    txnHandler.commitTxn(new CommitTxnRequest(2));
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(3L, txnsInfo.getTxn_high_water_mark());
+    assertEquals(2, txnsInfo.getOpen_txns().size());
+    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+    assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
+    assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId());
+    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+
+    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+    assertEquals(3L, txns.getTxn_high_water_mark());
+    assertEquals(2, txns.getOpen_txns().size());
+    boolean[] saw = new boolean[4];
+    for (int i = 0; i < saw.length; i++) saw[i] = false;
+    for (Long tid : txns.getOpen_txns()) {
+      saw[tid.intValue()] = true;
+    }
+    assertTrue(saw[1]);
+    assertFalse(saw[2]);
+    assertTrue(saw[3]);
+  }
+
+  @Test
+  public void testLockDifferentDBs() throws Exception {
+    // Test that two different databases don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockSameDB() throws Exception {
+    // Test that two different databases don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockDbLocksTable() throws Exception {
+    // Test that locking a database prevents locking of tables in the database
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    comp.setTablename("mytable");
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockDbDoesNotLockTableInDifferentDB() throws Exception {
+    // Test that locking a database prevents locking of tables in the database
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    comp.setTablename("mytable");
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockDifferentTables() throws Exception {
+    // Test that two different tables don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    comp.setTablename("mytable");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    comp.setTablename("yourtable");
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockSameTable() throws Exception {
+    // Test that two different tables don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockTableLocksPartition() throws Exception {
+    // Test that locking a table prevents locking of partitions of the table
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockDifferentTableDoesntLockPartition() throws Exception {
+    // Test that locking a table prevents locking of partitions of the table
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("yourtable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockDifferentPartitions() throws Exception {
+    // Test that two different partitions don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("yourpartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockSamePartition() throws Exception {
+    // Test that two different partitions don't collide on their locks
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockSRSR() throws Exception {
+    // Test that two shared read locks can share a partition
+    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.INSERT);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockESRSR() throws Exception {
+    // Test that exclusive lock blocks shared reads
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.INSERT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockSRSW() throws Exception {
+    // Test that write can acquire after read
+    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.INSERT);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockESRSW() throws Exception {
+    // Test that exclusive lock blocks read and write
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.UPDATE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockSRE() throws Exception {
+    // Test that read blocks exclusive
+    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockESRE() throws Exception {
+    // Test that exclusive blocks read and exclusive
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockSWSR() throws Exception {
+    // Test that read can acquire after write
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.UPDATE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testLockSWSWSR() throws Exception {
+    // Test that write blocks write but read can still acquire
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.UPDATE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.INSERT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testWrongLockForOperation() throws Exception {
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    Exception expectedError = null;
+    try {
+      LockResponse res = txnHandler.lock(req);
+    }
+    catch(Exception e) {
+      expectedError = e;
+    }
+    Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType"));
+  }
+  @Test
+  public void testLockSWSWSW() throws Exception {
+    // Test that write blocks two writes
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockEESW() throws Exception {
+    // Test that exclusive blocks exclusive and write
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testLockEESR() throws Exception {
+    // Test that exclusive blocks exclusive and read
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.SELECT);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.WAITING);
+  }
+
+  @Test
+  public void testCheckLockAcquireAfterWaiting() throws Exception {
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    long txnId = openTxn();
+    req.setTxnid(txnId);
+    LockResponse res = txnHandler.lock(req);
+    long lockid1 = res.getLockid();
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.UPDATE);
+    components.clear();
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
+    res = txnHandler.lock(req);
+    long lockid2 = res.getLockid();
+    assertTrue(res.getState() == LockState.WAITING);
+
+    txnHandler.abortTxn(new AbortTxnRequest(txnId));
+    res = txnHandler.checkLock(new CheckLockRequest(lockid2));
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testCheckLockNoSuchLock() throws Exception {
+    try {
+      txnHandler.checkLock(new CheckLockRequest(23L));
+      fail("Allowed to check lock on non-existent lock");
+    } catch (NoSuchLockException e) {
+    }
+  }
+
+  @Test
+  public void testCheckLockTxnAborted() throws Exception {
+    // Test that when a transaction is aborted, the heartbeat fails
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    long lockid = res.getLockid();
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    try {
+      // This will throw NoSuchLockException (even though it's the
+      // transaction we've closed) because that will have deleted the lock.
+      txnHandler.checkLock(new CheckLockRequest(lockid));
+      fail("Allowed to check lock on aborted transaction.");
+    } catch (NoSuchLockException e) {
+    }
+  }
+
+  @Test
+  public void testMultipleLock() throws Exception {
+    // Test more than one lock can be handled in a lock request
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(2);
+    components.add(comp);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("anotherpartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    long lockid = res.getLockid();
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    res = txnHandler.checkLock(new CheckLockRequest(lockid));
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.unlock(new UnlockRequest(lockid));
+    assertEquals(0, txnHandler.numLocksInLockTable());
+  }
+
+  @Test
+  public void testMultipleLockWait() throws Exception {
+    // Test that two shared read locks can share a partition
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(2);
+    components.add(comp);
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("anotherpartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    long lockid1 = res.getLockid();
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+
+    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    res = txnHandler.lock(req);
+    long lockid2 = res.getLockid();
+    assertTrue(res.getState() == LockState.WAITING);
+
+    txnHandler.unlock(new UnlockRequest(lockid1));
+
+    res = txnHandler.checkLock(new CheckLockRequest(lockid2));
+    assertTrue(res.getState() == LockState.ACQUIRED);
+  }
+
+  @Test
+  public void testUnlockOnCommit() throws Exception {
+    // Test that committing unlocks
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,  "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.DELETE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+    assertEquals(0, txnHandler.numLocksInLockTable());
+  }
+
+  @Test
+  public void testUnlockOnAbort() throws Exception {
+    // Test that committing unlocks
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.UPDATE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    assertEquals(0, txnHandler.numLocksInLockTable());
+  }
+
+  @Test
+  public void testUnlockWithTxn() throws Exception {
+    LOG.debug("Starting testUnlockWithTxn");
+    // Test that attempting to unlock locks associated with a transaction
+    // generates an error
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.DELETE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    long lockid = res.getLockid();
+    try {
+      txnHandler.unlock(new UnlockRequest(lockid));
+      fail("Allowed to unlock lock associated with transaction.");
+    } catch (TxnOpenException e) {
+    }
+  }
+
+  @Test
+  public void testHeartbeatTxnAborted() throws Exception {
+    // Test that when a transaction is aborted, the heartbeat fails
+    openTxn();
+    txnHandler.abortTxn(new AbortTxnRequest(1));
+    HeartbeatRequest h = new HeartbeatRequest();
+    h.setTxnid(1);
+    try {
+      txnHandler.heartbeat(h);
+      fail("Told there was a txn, when it should have been aborted.");
+    } catch (TxnAbortedException e) {
+    }
+ }
+
+  @Test
+  public void testHeartbeatNoTxn() throws Exception {
+    // Test that when a transaction is aborted, the heartbeat fails
+    HeartbeatRequest h = new HeartbeatRequest();
+    h.setTxnid(939393L);
+    try {
+      txnHandler.heartbeat(h);
+      fail("Told there was a txn, when there wasn't.");
+    } catch (NoSuchTxnException e) {
+    }
+  }
+
+  @Test
+  public void testHeartbeatLock() throws Exception {
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
+    HeartbeatRequest h = new HeartbeatRequest();
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setTablename("mytable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    h.setLockid(res.getLockid());
+    for (int i = 0; i < 30; i++) {
+      try {
+        txnHandler.heartbeat(h);
+      } catch (NoSuchLockException e) {
+        fail("Told there was no lock, when the heartbeat should have kept it.");
+      }
+    }
+  }
+
+  @Test
+  public void heartbeatTxnRange() throws Exception {
+    long txnid = openTxn();
+    assertEquals(1, txnid);
+    txnid = openTxn();
+    txnid = openTxn();
+    HeartbeatTxnRangeResponse rsp =
+        txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
+    assertEquals(0, rsp.getAborted().size());
+    assertEquals(0, rsp.getNosuch().size());
+  }
+
+  @Test
+  public void heartbeatTxnRangeOneCommitted() throws Exception {
+    long txnid = openTxn();
+    assertEquals(1, txnid);
+    txnHandler.commitTxn(new CommitTxnRequest(1));
+    txnid = openTxn();
+    txnid = openTxn();
+    HeartbeatTxnRangeResponse rsp =
+      txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
+    assertEquals(1, rsp.getNosuchSize());
+    Long txn = rsp.getNosuch().iterator().next();
+    assertEquals(1L, (long)txn);
+    assertEquals(0, rsp.getAborted().size());
+  }
+
+  @Test
+  public void heartbeatTxnRangeOneAborted() throws Exception {
+    long txnid = openTxn();
+    assertEquals(1, txnid);
+    txnid = openTxn();
+    txnid = openTxn();
+    txnHandler.abortTxn(new AbortTxnRequest(3));
+    HeartbeatTxnRangeResponse rsp =
+      txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
+    assertEquals(1, rsp.getAbortedSize());
+    Long txn = rsp.getAborted().iterator().next();
+    assertEquals(3L, (long)txn);
+    assertEquals(0, rsp.getNosuch().size());
+  }
+
+  @Test
+  public void testLockTimeout() throws Exception {
+    long timeout = txnHandler.setTimeout(1);
+    try {
+      LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+      comp.setTablename("mytable");
+      comp.setPartitionname("mypartition");
+      comp.setOperationType(DataOperationType.NO_TXN);
+      List<LockComponent> components = new ArrayList<LockComponent>(1);
+      components.add(comp);
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      LockResponse res = txnHandler.lock(req);
+      assertTrue(res.getState() == LockState.ACQUIRED);
+      Thread.sleep(10);
+      txnHandler.performTimeOuts();
+      txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
+      fail("Told there was a lock, when it should have timed out.");
+    } catch (NoSuchLockException e) {
+    } finally {
+      txnHandler.setTimeout(timeout);
+    }
+  }
+
+  @Test
+  public void testRecoverManyTimeouts() throws Exception {
+    long timeout = txnHandler.setTimeout(1);
+    try {
+      txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost"));
+      Thread.sleep(10);
+      txnHandler.performTimeOuts();
+      GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo();
+      int numAborted = 0;
+      for (TxnInfo txnInfo : rsp.getOpen_txns()) {
+        assertEquals(TxnState.ABORTED, txnInfo.getState());
+        numAborted++;
+      }
+      assertEquals(503, numAborted);
+    } finally {
+      txnHandler.setTimeout(timeout);
+    }
+
+
+  }
+
+  @Test
+  public void testHeartbeatNoLock() throws Exception {
+    HeartbeatRequest h = new HeartbeatRequest();
+    h.setLockid(29389839L);
+    try {
+      txnHandler.heartbeat(h);
+      fail("Told there was a lock, when there wasn't.");
+    } catch (NoSuchLockException e) {
+    }
+  }
+
+  @Test
+  public void testCompactMajorWithPartition() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(1, compacts.size());
+    ShowCompactResponseElement c = compacts.get(0);
+    assertEquals("foo", c.getDbname());
+    assertEquals("bar", c.getTablename());
+    assertEquals("ds=today", c.getPartitionname());
+    assertEquals(CompactionType.MAJOR, c.getType());
+    assertEquals("initiated", c.getState());
+    assertEquals(0L, c.getStart());
+  }
+
+  @Test
+  public void testCompactMinorNoPartition() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setRunas("fred");
+    txnHandler.compact(rqst);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(1, compacts.size());
+    ShowCompactResponseElement c = compacts.get(0);
+    assertEquals("foo", c.getDbname());
+    assertEquals("bar", c.getTablename());
+    assertNull(c.getPartitionname());
+    assertEquals(CompactionType.MINOR, c.getType());
+    assertEquals("initiated", c.getState());
+    assertEquals(0L, c.getStart());
+    assertEquals("fred", c.getRunAs());
+  }
+
+  @Test
+  public void showLocks() throws Exception {
+    long begining = System.currentTimeMillis();
+    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+    comp.setOperationType(DataOperationType.NO_TXN);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    LockResponse res = txnHandler.lock(req);
+
+    // Open txn
+    long txnid = openTxn();
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.SELECT);
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    res = txnHandler.lock(req);
+
+    // Locks not associated with a txn
+    components = new ArrayList<LockComponent>(1);
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
+    comp.setTablename("yourtable");
+    comp.setPartitionname("yourpartition");
+    comp.setOperationType(DataOperationType.INSERT);
+    components.add(comp);
+    req = new LockRequest(components, "you", "remotehost");
+    res = txnHandler.lock(req);
+
+    ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest());
+    List<ShowLocksResponseElement> locks = rsp.getLocks();
+    assertEquals(3, locks.size());
+    boolean[] saw = new boolean[locks.size()];
+    for (int i = 0; i < saw.length; i++) saw[i] = false;
+    for (ShowLocksResponseElement lock : locks) {
+      if (lock.getLockid() == 1) {
+        assertEquals(0, lock.getTxnid());
+        assertEquals("mydb", lock.getDbname());
+        assertNull(lock.getTablename());
+        assertNull(lock.getPartname());
+        assertEquals(LockState.ACQUIRED, lock.getState());
+        assertEquals(LockType.EXCLUSIVE, lock.getType());
+        assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
+        assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
+            + " and " + System.currentTimeMillis(),
+            begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
+        assertEquals("me", lock.getUser());
+        assertEquals("localhost", lock.getHostname());
+        saw[0] = true;
+      } else if (lock.getLockid() == 2) {
+        assertEquals(1, lock.getTxnid());
+        assertEquals("mydb", lock.getDbname());
+        assertEquals("mytable", lock.getTablename());
+        assertNull(lock.getPartname());
+        assertEquals(LockState.WAITING, lock.getState());
+        assertEquals(LockType.SHARED_READ, lock.getType());
+        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+          lock.getTxnid() != 0);
+        assertEquals(0, lock.getAcquiredat());
+        assertEquals("me", lock.getUser());
+        assertEquals("localhost", lock.getHostname());
+        saw[1] = true;
+      } else if (lock.getLockid() == 3) {
+        assertEquals(0, lock.getTxnid());
+        assertEquals("yourdb", lock.getDbname());
+        assertEquals("yourtable", lock.getTablename());
+        assertEquals("yourpartition", lock.getPartname());
+        assertEquals(LockState.ACQUIRED, lock.getState());
+        assertEquals(LockType.SHARED_READ, lock.getType());
+        assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
+            System.currentTimeMillis() >= lock.getLastheartbeat());
+        assertTrue(begining <= lock.getAcquiredat() &&
+            System.currentTimeMillis() >= lock.getAcquiredat());
+        assertEquals("you", lock.getUser());
+        assertEquals("remotehost", lock.getHostname());
+        saw[2] = true;
+      } else {
+        fail("Unknown lock id");
+      }
+    }
+    for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
+  }
+
+  @Test
+  @Ignore("Wedges Derby")
+  public void deadlockDetected() throws Exception {
+    LOG.debug("Starting deadlock test");
+    if (txnHandler instanceof TxnHandler) {
+      final TxnHandler tHndlr = (TxnHandler)txnHandler;
+      Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = conn.createStatement();
+      long now = tHndlr.getDbTime(conn);
+      stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+          "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+          "'scooby.com')");
+      stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+          "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+          "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+          tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+          "'scooby.com')");
+      conn.commit();
+      tHndlr.closeDbConn(conn);
+
+      final AtomicBoolean sawDeadlock = new AtomicBoolean();
+
+      final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      try {
+
+        for (int i = 0; i < 5; i++) {
+          Thread t1 = new Thread() {
+            @Override
+            public void run() {
+              try {
+                try {
+                  updateTxns(conn1);
+                  updateLocks(conn1);
+                  Thread.sleep(1000);
+                  conn1.commit();
+                  LOG.debug("no exception, no deadlock");
+                } catch (SQLException e) {
+                  try {
+                    tHndlr.checkRetryable(conn1, e, "thread t1");
+                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                        " msg is <" + e.getMessage() + ">");
+                  } catch (TxnHandler.RetryException de) {
+                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                        "exception is " + e.getClass().getName() + " msg is <" + e
+                        .getMessage() + ">");
+                    sawDeadlock.set(true);
+                  }
+                }
+                conn1.rollback();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          };
+
+          Thread t2 = new Thread() {
+            @Override
+            public void run() {
+              try {
+                try {
+                  updateLocks(conn2);
+                  updateTxns(conn2);
+                  Thread.sleep(1000);
+                  conn2.commit();
+                  LOG.debug("no exception, no deadlock");
+                } catch (SQLException e) {
+                  try {
+                    tHndlr.checkRetryable(conn2, e, "thread t2");
+                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                        " msg is <" + e.getMessage() + ">");
+                  } catch (TxnHandler.RetryException de) {
+                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                        "exception is " + e.getClass().getName() + " msg is <" + e
+                        .getMessage() + ">");
+                    sawDeadlock.set(true);
+                  }
+                }
+                conn2.rollback();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            }
+          };
+
+          t1.start();
+          t2.start();
+          t1.join();
+          t2.join();
+          if (sawDeadlock.get()) break;
+        }
+        assertTrue(sawDeadlock.get());
+      } finally {
+        conn1.rollback();
+        tHndlr.closeDbConn(conn1);
+        conn2.rollback();
+        tHndlr.closeDbConn(conn2);
+      }
+    }
+  }
+
+  /**
+   * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+   * 1. add to metastore/pom.xml
+   *     <dependency>
+   *      <groupId>mysql</groupId>
+   *      <artifactId>mysql-connector-java</artifactId>
+   *      <version>5.1.30</version>
+   *     </dependency>
+   * 2. Hack in the c'tor of this class
+   *     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+   * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+   *      
+   */
+  @Ignore("multiple threads wedge Derby")
+  @Test
+  public void testMutexAPI() throws Exception {
+    final TxnStore.MutexAPI api =  txnHandler.getMutexAPI();
+    final AtomicInteger stepTracker = new AtomicInteger(0);
+    /**
+     * counter = 0;
+     * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock 
+     * Thread2 counter=2, lock (and block), inc counter, should be 4
+     */
+    Thread t1 = new Thread("MutexTest1") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 1
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          Thread.sleep(4000);
+          //stepTracker should now be 2 which indicates t2 has started
+          Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
+          stepTracker.incrementAndGet();//now 3
+          handle.releaseLocks();
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t1.setDaemon(true);
+    ErrorHandle ueh1 = new ErrorHandle();
+    t1.setUncaughtExceptionHandler(ueh1);
+    Thread t2 = new Thread("MutexTest2") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 2
+          //this should block until t1 unlocks
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          stepTracker.incrementAndGet();//now 4
+          Assert.assertEquals(4, stepTracker.get());
+          handle.releaseLocks();
+          stepTracker.incrementAndGet();//now 5
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t2.setDaemon(true);
+    ErrorHandle ueh2 = new ErrorHandle();
+    t2.setUncaughtExceptionHandler(ueh2);
+    t1.start();
+    try {
+      Thread.sleep(1000);
+    }
+    catch(InterruptedException ex) {
+      LOG.info("Sleep was interrupted");
+    }
+    t2.start();
+    t1.join(6000);//so that test doesn't block
+    t2.join(6000);
+
+    if(ueh1.error != null) {
+      Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
+    }
+    if (ueh2.error != null) {
+      Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
+    }
+    Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+  }
+  private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+    Throwable error = null;
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+      error = e;
+    }
+  }
+
+  @Test
+  public void testRetryableRegex() throws Exception {
+    SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000");
+    // Note that we have 3 regex'es below
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*");
+    boolean result = TxnHandler.isRetryable(conf, sqlException);
+    Assert.assertTrue("regex should be retryable", result);
+
+    sqlException = new SQLException("This error message, has comma in it");
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*");
+    result = TxnHandler.isRetryable(conf, sqlException);
+    Assert.assertTrue("regex should be retryable", result);
+  }
+
+  private void updateTxns(Connection conn) throws SQLException {
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
+  }
+
+  private void updateLocks(Connection conn) throws SQLException {
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TxnDbUtil.prepDb();
+    txnHandler = TxnUtils.getTxnStore(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TxnDbUtil.cleanDb();
+  }
+
+  private long openTxn() throws MetaException {
+    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
+    return txns.get(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index d80a03e..a1bd0fb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -26,9 +26,12 @@ import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HouseKeeperService;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
+import org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService;
 import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
 import org.apache.hadoop.hive.ql.txn.compactor.Initiator;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -704,7 +708,7 @@ public class TestTxnCommands2 {
     while(houseKeeperService.getIsAliveCounter() <= lastCount) {
       if(iterCount++ >= maxIter) {
         //prevent test hangs
-        throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+        throw new IllegalStateException("HouseKeeper didn't run after " + (iterCount - 1) + " waits");
       }
       try {
         Thread.sleep(100);//make sure it has run at least once
@@ -794,6 +798,41 @@ public class TestTxnCommands2 {
     Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true"));
   }
 
+  @Test
+  public void testOpenTxnsCounter() throws Exception {
+    hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3);
+    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
+
+    AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService();
+    runHouseKeeperService(openTxnsCounterService, hiveConf);  // will update current number of open txns to 3
+
+    MetaException exception = null;
+    // This should fail once it finds out the threshold has been reached
+    try {
+      txnHandler.openTxns(new OpenTxnRequest(1, "you", "localhost"));
+    } catch (MetaException e) {
+      exception = e;
+    }
+    Assert.assertNotNull("Opening new transaction shouldn't be allowed", exception);
+    Assert.assertTrue(exception.getMessage().equals("Maximum allowed number of open transactions has been reached. See hive.max.open.txns."));
+
+    // After committing the initial txns, and updating current number of open txns back to 0,
+    // new transactions should be allowed to open
+    for (long txnid : openTxnsResponse.getTxn_ids()) {
+      txnHandler.commitTxn(new CommitTxnRequest(txnid));
+    }
+    runHouseKeeperService(openTxnsCounterService, hiveConf);  // will update current number of open txns back to 0
+    exception = null;
+    try {
+      txnHandler.openTxns(new OpenTxnRequest(1, "him", "localhost"));
+    } catch (MetaException e) {
+      exception = e;
+    }
+    Assert.assertNull(exception);
+  }
+
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order


[2/3] hive git commit: HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
deleted file mode 100644
index 2804e21..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ /dev/null
@@ -1,1484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.TxnState;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestTxnHandler {
-  static final private String CLASS_NAME = TxnHandler.class.getName();
-  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
-  private HiveConf conf = new HiveConf();
-  private TxnStore txnHandler;
-
-  public TestTxnHandler() throws Exception {
-    TxnDbUtil.setConfValues(conf);
-    LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
-    Configuration conf = ctx.getConfiguration();
-    conf.getLoggerConfig(CLASS_NAME).setLevel(Level.DEBUG);
-    ctx.updateLoggers(conf);
-    tearDown();
-  }
-
-  @Test
-  public void testValidTxnsEmpty() throws Exception {
-    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(0L, txnsInfo.getTxn_high_water_mark());
-    assertTrue(txnsInfo.getOpen_txns().isEmpty());
-    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(0L, txns.getTxn_high_water_mark());
-    assertTrue(txns.getOpen_txns().isEmpty());
-  }
-
-  @Test
-  public void testOpenTxn() throws Exception {
-    long first = openTxn();
-    assertEquals(1L, first);
-    long second = openTxn();
-    assertEquals(2L, second);
-    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
-    assertEquals(2, txnsInfo.getOpen_txns().size());
-    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
-    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState());
-    assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
-    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
-    assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser());
-    assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname());
-
-    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(2L, txns.getTxn_high_water_mark());
-    assertEquals(2, txns.getOpen_txns().size());
-    boolean[] saw = new boolean[3];
-    for (int i = 0; i < saw.length; i++) saw[i] = false;
-    for (Long tid : txns.getOpen_txns()) {
-      saw[tid.intValue()] = true;
-    }
-    for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
-  }
-
-  @Test
-  public void testAbortTxn() throws Exception {
-    OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
-    List<Long> txnList = openedTxns.getTxn_ids();
-    long first = txnList.get(0);
-    assertEquals(1L, first);
-    long second = txnList.get(1);
-    assertEquals(2L, second);
-    txnHandler.abortTxn(new AbortTxnRequest(1));
-    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
-    assertEquals(2, txnsInfo.getOpen_txns().size());
-    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
-    assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
-    assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
-    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
-
-    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(2L, txns.getTxn_high_water_mark());
-    assertEquals(2, txns.getOpen_txns().size());
-    boolean[] saw = new boolean[3];
-    for (int i = 0; i < saw.length; i++) saw[i] = false;
-    for (Long tid : txns.getOpen_txns()) {
-      saw[tid.intValue()] = true;
-    }
-    for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
-  }
-
-  @Test
-  public void testAbortInvalidTxn() throws Exception {
-    boolean caught = false;
-    try {
-      txnHandler.abortTxn(new AbortTxnRequest(195L));
-    } catch (NoSuchTxnException e) {
-      caught = true;
-    }
-    assertTrue(caught);
-  }
-
-  @Test
-  public void testValidTxnsNoneOpen() throws Exception {
-    txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
-    txnHandler.commitTxn(new CommitTxnRequest(1));
-    txnHandler.commitTxn(new CommitTxnRequest(2));
-    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(2L, txnsInfo.getTxn_high_water_mark());
-    assertEquals(0, txnsInfo.getOpen_txns().size());
-    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(2L, txns.getTxn_high_water_mark());
-    assertEquals(0, txns.getOpen_txns().size());
-  }
-
-  @Test
-  public void testValidTxnsSomeOpen() throws Exception {
-    txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
-    txnHandler.abortTxn(new AbortTxnRequest(1));
-    txnHandler.commitTxn(new CommitTxnRequest(2));
-    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
-    assertEquals(3L, txnsInfo.getTxn_high_water_mark());
-    assertEquals(2, txnsInfo.getOpen_txns().size());
-    assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
-    assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
-    assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId());
-    assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
-
-    GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
-    assertEquals(3L, txns.getTxn_high_water_mark());
-    assertEquals(2, txns.getOpen_txns().size());
-    boolean[] saw = new boolean[4];
-    for (int i = 0; i < saw.length; i++) saw[i] = false;
-    for (Long tid : txns.getOpen_txns()) {
-      saw[tid.intValue()] = true;
-    }
-    assertTrue(saw[1]);
-    assertFalse(saw[2]);
-    assertTrue(saw[3]);
-  }
-
-  @Test
-  public void testLockDifferentDBs() throws Exception {
-    // Test that two different databases don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockSameDB() throws Exception {
-    // Test that two different databases don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockDbLocksTable() throws Exception {
-    // Test that locking a database prevents locking of tables in the database
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    comp.setTablename("mytable");
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockDbDoesNotLockTableInDifferentDB() throws Exception {
-    // Test that locking a database prevents locking of tables in the database
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    comp.setTablename("mytable");
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockDifferentTables() throws Exception {
-    // Test that two different tables don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    comp.setTablename("mytable");
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    comp.setTablename("yourtable");
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockSameTable() throws Exception {
-    // Test that two different tables don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockTableLocksPartition() throws Exception {
-    // Test that locking a table prevents locking of partitions of the table
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockDifferentTableDoesntLockPartition() throws Exception {
-    // Test that locking a table prevents locking of partitions of the table
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("yourtable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockDifferentPartitions() throws Exception {
-    // Test that two different partitions don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("yourpartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockSamePartition() throws Exception {
-    // Test that two different partitions don't collide on their locks
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockSRSR() throws Exception {
-    // Test that two shared read locks can share a partition
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockESRSR() throws Exception {
-    // Test that exclusive lock blocks shared reads
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.INSERT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockSRSW() throws Exception {
-    // Test that write can acquire after read
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockESRSW() throws Exception {
-    // Test that exclusive lock blocks read and write
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockSRE() throws Exception {
-    // Test that read blocks exclusive
-    LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockESRE() throws Exception {
-    // Test that exclusive blocks read and exclusive
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockSWSR() throws Exception {
-    // Test that read can acquire after write
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testLockSWSWSR() throws Exception {
-    // Test that write blocks write but read can still acquire
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.INSERT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testWrongLockForOperation() throws Exception {
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    Exception expectedError = null;
-    try {
-      LockResponse res = txnHandler.lock(req);
-    }
-    catch(Exception e) {
-      expectedError = e;
-    }
-    Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType"));
-  }
-  @Test
-  public void testLockSWSWSW() throws Exception {
-    // Test that write blocks two writes
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockEESW() throws Exception {
-    // Test that exclusive blocks exclusive and write
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testLockEESR() throws Exception {
-    // Test that exclusive blocks exclusive and read
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.SELECT);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.WAITING);
-  }
-
-  @Test
-  public void testCheckLockAcquireAfterWaiting() throws Exception {
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    long txnId = openTxn();
-    req.setTxnid(txnId);
-    LockResponse res = txnHandler.lock(req);
-    long lockid1 = res.getLockid();
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components.clear();
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(openTxn());
-    res = txnHandler.lock(req);
-    long lockid2 = res.getLockid();
-    assertTrue(res.getState() == LockState.WAITING);
-
-    txnHandler.abortTxn(new AbortTxnRequest(txnId));
-    res = txnHandler.checkLock(new CheckLockRequest(lockid2));
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testCheckLockNoSuchLock() throws Exception {
-    try {
-      txnHandler.checkLock(new CheckLockRequest(23L));
-      fail("Allowed to check lock on non-existent lock");
-    } catch (NoSuchLockException e) {
-    }
-  }
-
-  @Test
-  public void testCheckLockTxnAborted() throws Exception {
-    // Test that when a transaction is aborted, the heartbeat fails
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    long lockid = res.getLockid();
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-    try {
-      // This will throw NoSuchLockException (even though it's the
-      // transaction we've closed) because that will have deleted the lock.
-      txnHandler.checkLock(new CheckLockRequest(lockid));
-      fail("Allowed to check lock on aborted transaction.");
-    } catch (NoSuchLockException e) {
-    }
-  }
-
-  @Test
-  public void testMultipleLock() throws Exception {
-    // Test more than one lock can be handled in a lock request
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(2);
-    components.add(comp);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("anotherpartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    long lockid = res.getLockid();
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    res = txnHandler.checkLock(new CheckLockRequest(lockid));
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.unlock(new UnlockRequest(lockid));
-    assertEquals(0, txnHandler.numLocksInLockTable());
-  }
-
-  @Test
-  public void testMultipleLockWait() throws Exception {
-    // Test that two shared read locks can share a partition
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(2);
-    components.add(comp);
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("anotherpartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    long lockid1 = res.getLockid();
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-
-    comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    res = txnHandler.lock(req);
-    long lockid2 = res.getLockid();
-    assertTrue(res.getState() == LockState.WAITING);
-
-    txnHandler.unlock(new UnlockRequest(lockid1));
-
-    res = txnHandler.checkLock(new CheckLockRequest(lockid2));
-    assertTrue(res.getState() == LockState.ACQUIRED);
-  }
-
-  @Test
-  public void testUnlockOnCommit() throws Exception {
-    // Test that committing unlocks
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,  "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.commitTxn(new CommitTxnRequest(txnid));
-    assertEquals(0, txnHandler.numLocksInLockTable());
-  }
-
-  @Test
-  public void testUnlockOnAbort() throws Exception {
-    // Test that committing unlocks
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-    assertEquals(0, txnHandler.numLocksInLockTable());
-  }
-
-  @Test
-  public void testUnlockWithTxn() throws Exception {
-    LOG.debug("Starting testUnlockWithTxn");
-    // Test that attempting to unlock locks associated with a transaction
-    // generates an error
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.DELETE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    long lockid = res.getLockid();
-    try {
-      txnHandler.unlock(new UnlockRequest(lockid));
-      fail("Allowed to unlock lock associated with transaction.");
-    } catch (TxnOpenException e) {
-    }
-  }
-
-  @Test
-  public void testHeartbeatTxnAborted() throws Exception {
-    // Test that when a transaction is aborted, the heartbeat fails
-    openTxn();
-    txnHandler.abortTxn(new AbortTxnRequest(1));
-    HeartbeatRequest h = new HeartbeatRequest();
-    h.setTxnid(1);
-    try {
-      txnHandler.heartbeat(h);
-      fail("Told there was a txn, when it should have been aborted.");
-    } catch (TxnAbortedException e) {
-    }
- }
-
-  @Test
-  public void testHeartbeatNoTxn() throws Exception {
-    // Test that when a transaction is aborted, the heartbeat fails
-    HeartbeatRequest h = new HeartbeatRequest();
-    h.setTxnid(939393L);
-    try {
-      txnHandler.heartbeat(h);
-      fail("Told there was a txn, when there wasn't.");
-    } catch (NoSuchTxnException e) {
-    }
-  }
-
-  @Test
-  public void testHeartbeatLock() throws Exception {
-    conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
-    HeartbeatRequest h = new HeartbeatRequest();
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setTablename("mytable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    h.setLockid(res.getLockid());
-    for (int i = 0; i < 30; i++) {
-      try {
-        txnHandler.heartbeat(h);
-      } catch (NoSuchLockException e) {
-        fail("Told there was no lock, when the heartbeat should have kept it.");
-      }
-    }
-  }
-
-  @Test
-  public void heartbeatTxnRange() throws Exception {
-    long txnid = openTxn();
-    assertEquals(1, txnid);
-    txnid = openTxn();
-    txnid = openTxn();
-    HeartbeatTxnRangeResponse rsp =
-        txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
-    assertEquals(0, rsp.getAborted().size());
-    assertEquals(0, rsp.getNosuch().size());
-  }
-
-  @Test
-  public void heartbeatTxnRangeOneCommitted() throws Exception {
-    long txnid = openTxn();
-    assertEquals(1, txnid);
-    txnHandler.commitTxn(new CommitTxnRequest(1));
-    txnid = openTxn();
-    txnid = openTxn();
-    HeartbeatTxnRangeResponse rsp =
-      txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
-    assertEquals(1, rsp.getNosuchSize());
-    Long txn = rsp.getNosuch().iterator().next();
-    assertEquals(1L, (long)txn);
-    assertEquals(0, rsp.getAborted().size());
-  }
-
-  @Test
-  public void heartbeatTxnRangeOneAborted() throws Exception {
-    long txnid = openTxn();
-    assertEquals(1, txnid);
-    txnid = openTxn();
-    txnid = openTxn();
-    txnHandler.abortTxn(new AbortTxnRequest(3));
-    HeartbeatTxnRangeResponse rsp =
-      txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
-    assertEquals(1, rsp.getAbortedSize());
-    Long txn = rsp.getAborted().iterator().next();
-    assertEquals(3L, (long)txn);
-    assertEquals(0, rsp.getNosuch().size());
-  }
-
-  @Test
-  public void testLockTimeout() throws Exception {
-    long timeout = txnHandler.setTimeout(1);
-    try {
-      LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-      comp.setTablename("mytable");
-      comp.setPartitionname("mypartition");
-      comp.setOperationType(DataOperationType.NO_TXN);
-      List<LockComponent> components = new ArrayList<LockComponent>(1);
-      components.add(comp);
-      LockRequest req = new LockRequest(components, "me", "localhost");
-      LockResponse res = txnHandler.lock(req);
-      assertTrue(res.getState() == LockState.ACQUIRED);
-      Thread.sleep(10);
-      txnHandler.performTimeOuts();
-      txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
-      fail("Told there was a lock, when it should have timed out.");
-    } catch (NoSuchLockException e) {
-    } finally {
-      txnHandler.setTimeout(timeout);
-    }
-  }
-
-  @Test
-  public void testRecoverManyTimeouts() throws Exception {
-    long timeout = txnHandler.setTimeout(1);
-    try {
-      txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost"));
-      Thread.sleep(10);
-      txnHandler.performTimeOuts();
-      GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo();
-      int numAborted = 0;
-      for (TxnInfo txnInfo : rsp.getOpen_txns()) {
-        assertEquals(TxnState.ABORTED, txnInfo.getState());
-        numAborted++;
-      }
-      assertEquals(503, numAborted);
-    } finally {
-      txnHandler.setTimeout(timeout);
-    }
-
-
-  }
-
-  @Test
-  public void testHeartbeatNoLock() throws Exception {
-    HeartbeatRequest h = new HeartbeatRequest();
-    h.setLockid(29389839L);
-    try {
-      txnHandler.heartbeat(h);
-      fail("Told there was a lock, when there wasn't.");
-    } catch (NoSuchLockException e) {
-    }
-  }
-
-  @Test
-  public void testCompactMajorWithPartition() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MAJOR, c.getType());
-    assertEquals("initiated", c.getState());
-    assertEquals(0L, c.getStart());
-  }
-
-  @Test
-  public void testCompactMinorNoPartition() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setRunas("fred");
-    txnHandler.compact(rqst);
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertNull(c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("initiated", c.getState());
-    assertEquals(0L, c.getStart());
-    assertEquals("fred", c.getRunAs());
-  }
-
-  @Test
-  public void showLocks() throws Exception {
-    long begining = System.currentTimeMillis();
-    LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
-    comp.setOperationType(DataOperationType.NO_TXN);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    LockResponse res = txnHandler.lock(req);
-
-    // Open txn
-    long txnid = openTxn();
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.SELECT);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-
-    // Locks not associated with a txn
-    components = new ArrayList<LockComponent>(1);
-    comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
-    comp.setTablename("yourtable");
-    comp.setPartitionname("yourpartition");
-    comp.setOperationType(DataOperationType.INSERT);
-    components.add(comp);
-    req = new LockRequest(components, "you", "remotehost");
-    res = txnHandler.lock(req);
-
-    ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest());
-    List<ShowLocksResponseElement> locks = rsp.getLocks();
-    assertEquals(3, locks.size());
-    boolean[] saw = new boolean[locks.size()];
-    for (int i = 0; i < saw.length; i++) saw[i] = false;
-    for (ShowLocksResponseElement lock : locks) {
-      if (lock.getLockid() == 1) {
-        assertEquals(0, lock.getTxnid());
-        assertEquals("mydb", lock.getDbname());
-        assertNull(lock.getTablename());
-        assertNull(lock.getPartname());
-        assertEquals(LockState.ACQUIRED, lock.getState());
-        assertEquals(LockType.EXCLUSIVE, lock.getType());
-        assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
-        assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
-            + " and " + System.currentTimeMillis(),
-            begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
-        assertEquals("me", lock.getUser());
-        assertEquals("localhost", lock.getHostname());
-        saw[0] = true;
-      } else if (lock.getLockid() == 2) {
-        assertEquals(1, lock.getTxnid());
-        assertEquals("mydb", lock.getDbname());
-        assertEquals("mytable", lock.getTablename());
-        assertNull(lock.getPartname());
-        assertEquals(LockState.WAITING, lock.getState());
-        assertEquals(LockType.SHARED_READ, lock.getType());
-        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
-          lock.getTxnid() != 0);
-        assertEquals(0, lock.getAcquiredat());
-        assertEquals("me", lock.getUser());
-        assertEquals("localhost", lock.getHostname());
-        saw[1] = true;
-      } else if (lock.getLockid() == 3) {
-        assertEquals(0, lock.getTxnid());
-        assertEquals("yourdb", lock.getDbname());
-        assertEquals("yourtable", lock.getTablename());
-        assertEquals("yourpartition", lock.getPartname());
-        assertEquals(LockState.ACQUIRED, lock.getState());
-        assertEquals(LockType.SHARED_READ, lock.getType());
-        assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
-            System.currentTimeMillis() >= lock.getLastheartbeat());
-        assertTrue(begining <= lock.getAcquiredat() &&
-            System.currentTimeMillis() >= lock.getAcquiredat());
-        assertEquals("you", lock.getUser());
-        assertEquals("remotehost", lock.getHostname());
-        saw[2] = true;
-      } else {
-        fail("Unknown lock id");
-      }
-    }
-    for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
-  }
-
-  @Test
-  @Ignore("Wedges Derby")
-  public void deadlockDetected() throws Exception {
-    LOG.debug("Starting deadlock test");
-    if (txnHandler instanceof TxnHandler) {
-      final TxnHandler tHndlr = (TxnHandler)txnHandler;
-      Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-      Statement stmt = conn.createStatement();
-      long now = tHndlr.getDbTime(conn);
-      stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
-          "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
-          "'scooby.com')");
-      stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
-          "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
-          "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
-          tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
-          "'scooby.com')");
-      conn.commit();
-      tHndlr.closeDbConn(conn);
-
-      final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
-      final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-      final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-      try {
-
-        for (int i = 0; i < 5; i++) {
-          Thread t1 = new Thread() {
-            @Override
-            public void run() {
-              try {
-                try {
-                  updateTxns(conn1);
-                  updateLocks(conn1);
-                  Thread.sleep(1000);
-                  conn1.commit();
-                  LOG.debug("no exception, no deadlock");
-                } catch (SQLException e) {
-                  try {
-                    tHndlr.checkRetryable(conn1, e, "thread t1");
-                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
-                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
-                        " msg is <" + e.getMessage() + ">");
-                  } catch (TxnHandler.RetryException de) {
-                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
-                        "exception is " + e.getClass().getName() + " msg is <" + e
-                        .getMessage() + ">");
-                    sawDeadlock.set(true);
-                  }
-                }
-                conn1.rollback();
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-
-          Thread t2 = new Thread() {
-            @Override
-            public void run() {
-              try {
-                try {
-                  updateLocks(conn2);
-                  updateTxns(conn2);
-                  Thread.sleep(1000);
-                  conn2.commit();
-                  LOG.debug("no exception, no deadlock");
-                } catch (SQLException e) {
-                  try {
-                    tHndlr.checkRetryable(conn2, e, "thread t2");
-                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
-                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
-                        " msg is <" + e.getMessage() + ">");
-                  } catch (TxnHandler.RetryException de) {
-                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
-                        "exception is " + e.getClass().getName() + " msg is <" + e
-                        .getMessage() + ">");
-                    sawDeadlock.set(true);
-                  }
-                }
-                conn2.rollback();
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          };
-
-          t1.start();
-          t2.start();
-          t1.join();
-          t2.join();
-          if (sawDeadlock.get()) break;
-        }
-        assertTrue(sawDeadlock.get());
-      } finally {
-        conn1.rollback();
-        tHndlr.closeDbConn(conn1);
-        conn2.rollback();
-        tHndlr.closeDbConn(conn2);
-      }
-    }
-  }
-
-  /**
-   * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
-   * 1. add to metastore/pom.xml
-   *     <dependency>
-   *      <groupId>mysql</groupId>
-   *      <artifactId>mysql-connector-java</artifactId>
-   *      <version>5.1.30</version>
-   *     </dependency>
-   * 2. Hack in the c'tor of this class
-   *     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
-   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
-   *      conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
-   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
-   * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
-   *      
-   */
-  @Ignore("multiple threads wedge Derby")
-  @Test
-  public void testMutexAPI() throws Exception {
-    final TxnStore.MutexAPI api =  txnHandler.getMutexAPI();
-    final AtomicInteger stepTracker = new AtomicInteger(0);
-    /**
-     * counter = 0;
-     * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock 
-     * Thread2 counter=2, lock (and block), inc counter, should be 4
-     */
-    Thread t1 = new Thread("MutexTest1") {
-      public void run() {
-        try {
-          stepTracker.incrementAndGet();//now 1
-          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
-          Thread.sleep(4000);
-          //stepTracker should now be 2 which indicates t2 has started
-          Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
-          stepTracker.incrementAndGet();//now 3
-          handle.releaseLocks();
-        }
-        catch(Exception ex) {
-          throw new RuntimeException(ex.getMessage(), ex);
-        }
-      }
-    };
-    t1.setDaemon(true);
-    ErrorHandle ueh1 = new ErrorHandle();
-    t1.setUncaughtExceptionHandler(ueh1);
-    Thread t2 = new Thread("MutexTest2") {
-      public void run() {
-        try {
-          stepTracker.incrementAndGet();//now 2
-          //this should block until t1 unlocks
-          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
-          stepTracker.incrementAndGet();//now 4
-          Assert.assertEquals(4, stepTracker.get());
-          handle.releaseLocks();
-          stepTracker.incrementAndGet();//now 5
-        }
-        catch(Exception ex) {
-          throw new RuntimeException(ex.getMessage(), ex);
-        }
-      }
-    };
-    t2.setDaemon(true);
-    ErrorHandle ueh2 = new ErrorHandle();
-    t2.setUncaughtExceptionHandler(ueh2);
-    t1.start();
-    try {
-      Thread.sleep(1000);
-    }
-    catch(InterruptedException ex) {
-      LOG.info("Sleep was interrupted");
-    }
-    t2.start();
-    t1.join(6000);//so that test doesn't block
-    t2.join(6000);
-
-    if(ueh1.error != null) {
-      Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
-    }
-    if (ueh2.error != null) {
-      Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
-    }
-    Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
-  }
-  private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
-    Throwable error = null;
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-      LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
-      error = e;
-    }
-  }
-
-  @Test
-  public void testRetryableRegex() throws Exception {
-    SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000");
-    // Note that we have 3 regex'es below
-    conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*");
-    boolean result = TxnHandler.isRetryable(conf, sqlException);
-    Assert.assertTrue("regex should be retryable", result);
-
-    sqlException = new SQLException("This error message, has comma in it");
-    conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*");
-    result = TxnHandler.isRetryable(conf, sqlException);
-    Assert.assertTrue("regex should be retryable", result);
-  }
-
-  private void updateTxns(Connection conn) throws SQLException {
-    Statement stmt = conn.createStatement();
-    stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
-  }
-
-  private void updateLocks(Connection conn) throws SQLException {
-    Statement stmt = conn.createStatement();
-    stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1");
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    TxnDbUtil.prepDb();
-    txnHandler = TxnUtils.getTxnStore(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    TxnDbUtil.cleanDb();
-  }
-
-  private long openTxn() throws MetaException {
-    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
-    return txns.get(0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
new file mode 100644
index 0000000..f5eb8a1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Background running thread, periodically updating number of open transactions.
+ * Runs inside Hive Metastore Service.
+ */
+public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase {
+  private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class);
+  @Override
+  protected long getStartDelayMs() {
+    return 100;  // in miliseconds
+  }
+  @Override
+  protected long getIntervalMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Override
+  protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+    return new OpenTxnsCounter(hiveConf, isAliveCounter);
+  }
+  @Override
+  public String getServiceDescription() {
+    return "Count number of open transactions";
+  }
+  private static final class OpenTxnsCounter implements Runnable {
+    private final TxnStore txnHandler;
+    private final AtomicInteger isAliveCounter;
+    private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+      txnHandler = TxnUtils.getTxnStore(hiveConf);
+      this.isAliveCounter = isAliveCounter;
+    }
+    @Override
+    public void run() {
+      try {
+        long startTime = System.currentTimeMillis();
+        txnHandler.countOpenTxns();
+        int count = isAliveCounter.incrementAndGet();
+        LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
+      }
+      catch(Throwable t) {
+        LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
new file mode 100644
index 0000000..f513d0f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+/**
+ * Tests for TxnHandler.
+ */
+public class TestCompactionTxnHandler {
+
+  private HiveConf conf = new HiveConf();
+  private TxnStore txnHandler;
+
+  public TestCompactionTxnHandler() throws Exception {
+    TxnDbUtil.setConfValues(conf);
+    tearDown();
+  }
+
+  @Test
+  public void testFindNextToCompact() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    long now = System.currentTimeMillis();
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    assertEquals("foo", ci.dbname);
+    assertEquals("bar", ci.tableName);
+    assertEquals("ds=today", ci.partName);
+    assertEquals(CompactionType.MINOR, ci.type);
+    assertNull(ci.runAs);
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+    txnHandler.setRunAs(ci.id, "bob");
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(1, compacts.size());
+    ShowCompactResponseElement c = compacts.get(0);
+    assertEquals("foo", c.getDbname());
+    assertEquals("bar", c.getTablename());
+    assertEquals("ds=today", c.getPartitionname());
+    assertEquals(CompactionType.MINOR, c.getType());
+    assertEquals("working", c.getState());
+    assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
+    assertEquals("fred", c.getWorkerid());
+    assertEquals("bob", c.getRunAs());
+  }
+
+  @Test
+  public void testFindNextToCompact2() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+
+    rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=yesterday");
+    txnHandler.compact(rqst);
+
+    long now = System.currentTimeMillis();
+    boolean expectToday = false;
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    assertEquals("foo", ci.dbname);
+    assertEquals("bar", ci.tableName);
+    if ("ds=today".equals(ci.partName)) expectToday = false;
+    else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
+    else fail("partition name should have been today or yesterday but was " + ci.partName);
+    assertEquals(CompactionType.MINOR, ci.type);
+
+    ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    assertEquals("foo", ci.dbname);
+    assertEquals("bar", ci.tableName);
+    if (expectToday) assertEquals("ds=today", ci.partName);
+    else assertEquals("ds=yesterday", ci.partName);
+    assertEquals(CompactionType.MINOR, ci.type);
+
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(2, compacts.size());
+    for (ShowCompactResponseElement e : compacts) {
+      assertEquals("working", e.getState());
+      assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
+      assertEquals("fred", e.getWorkerid());
+    }
+  }
+
+  @Test
+  public void testFindNextToCompactNothingToCompact() throws Exception {
+    assertNull(txnHandler.findNextToCompact("fred"));
+  }
+
+  @Test
+  public void testMarkCompacted() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+
+    txnHandler.markCompacted(ci);
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(1, compacts.size());
+    ShowCompactResponseElement c = compacts.get(0);
+    assertEquals("foo", c.getDbname());
+    assertEquals("bar", c.getTablename());
+    assertEquals("ds=today", c.getPartitionname());
+    assertEquals(CompactionType.MINOR, c.getType());
+    assertEquals("ready for cleaning", c.getState());
+    assertNull(c.getWorkerid());
+  }
+
+  @Test
+  public void testFindNextToClean() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    txnHandler.markCompacted(ci);
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    assertEquals(1, toClean.size());
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(1, compacts.size());
+    ShowCompactResponseElement c = compacts.get(0);
+    assertEquals("foo", c.getDbname());
+    assertEquals("bar", c.getTablename());
+    assertEquals("ds=today", c.getPartitionname());
+    assertEquals(CompactionType.MINOR, c.getType());
+    assertEquals("ready for cleaning", c.getState());
+    assertNull(c.getWorkerid());
+  }
+
+  @Test
+  public void testMarkCleaned() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    txnHandler.markCompacted(ci);
+    assertNull(txnHandler.findNextToCompact("fred"));
+
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    assertEquals(1, toClean.size());
+    assertNull(txnHandler.findNextToCompact("fred"));
+    txnHandler.markCleaned(ci);
+    assertNull(txnHandler.findNextToCompact("fred"));
+    assertEquals(0, txnHandler.findReadyToClean().size());
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    assertEquals(1, rsp.getCompactsSize());
+    assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+  }
+
+  @Test
+  public void testRevokeFromLocalWorkers() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+    assertNotNull(txnHandler.findNextToCompact("bob-193892"));
+    assertNotNull(txnHandler.findNextToCompact("fred-193893"));
+    txnHandler.revokeFromLocalWorkers("fred");
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(3, compacts.size());
+    boolean sawWorkingBob = false;
+    int initiatedCount = 0;
+    for (ShowCompactResponseElement c : compacts) {
+      if (c.getState().equals("working")) {
+        assertEquals("bob-193892", c.getWorkerid());
+        sawWorkingBob = true;
+      } else if (c.getState().equals("initiated")) {
+        initiatedCount++;
+      } else {
+        fail("Unexpected state");
+      }
+    }
+    assertTrue(sawWorkingBob);
+    assertEquals(2, initiatedCount);
+  }
+
+  @Test
+  public void testRevokeTimedOutWorkers() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+
+    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+    Thread.sleep(200);
+    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+    txnHandler.revokeTimedoutWorkers(100);
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    assertEquals(2, compacts.size());
+    boolean sawWorking = false, sawInitiated = false;
+    for (ShowCompactResponseElement c : compacts) {
+      if (c.getState().equals("working"))  sawWorking = true;
+      else if (c.getState().equals("initiated")) sawInitiated = true;
+      else fail("Unexpected state");
+    }
+    assertTrue(sawWorking);
+    assertTrue(sawInitiated);
+  }
+
+  @Test
+  public void testFindPotentialCompactions() throws Exception {
+    // Test that committing unlocks
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+        "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.UPDATE);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+        "mydb");
+    comp.setTablename("yourtable");
+    comp.setPartitionname("mypartition");
+    comp.setOperationType(DataOperationType.UPDATE);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.commitTxn(new CommitTxnRequest(txnid));
+    assertEquals(0, txnHandler.numLocksInLockTable());
+
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+    assertEquals(2, potentials.size());
+    boolean sawMyTable = false, sawYourTable = false;
+    for (CompactionInfo ci : potentials) {
+      sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
+          ci.partName ==  null);
+      sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
+          ci.partName.equals("mypartition"));
+    }
+    assertTrue(sawMyTable);
+    assertTrue(sawYourTable);
+  }
+
+  // TODO test changes to mark cleaned to clean txns and txn_components
+
+  @Test
+  public void testMarkCleanedCleansTxnsAndTxnComponents()
+      throws Exception {
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+        "mydb");
+    comp.setTablename("mytable");
+    comp.setOperationType(DataOperationType.INSERT);
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    LockResponse res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+    txnid = openTxn();
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("yourtable");
+    comp.setOperationType(DataOperationType.DELETE);
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+    txnid = openTxn();
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("foo");
+    comp.setPartitionname("bar");
+    comp.setOperationType(DataOperationType.UPDATE);
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+
+    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+    comp.setTablename("foo");
+    comp.setPartitionname("baz");
+    comp.setOperationType(DataOperationType.UPDATE);
+    components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    res = txnHandler.lock(req);
+    assertTrue(res.getState() == LockState.ACQUIRED);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+    CompactionInfo ci = new CompactionInfo();
+
+    // Now clean them and check that they are removed from the count.
+    CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
+    txnHandler.compact(rqst);
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    txnHandler.markCompacted(ci);
+
+    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+    assertEquals(1, toClean.size());
+    txnHandler.markCleaned(ci);
+
+    // Check that we are cleaning up the empty aborted transactions
+    GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
+    assertEquals(3, txnList.getOpen_txnsSize());
+    txnHandler.cleanEmptyAbortedTxns();
+    txnList = txnHandler.getOpenTxns();
+    assertEquals(2, txnList.getOpen_txnsSize());
+
+    rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
+    rqst.setPartitionname("bar");
+    txnHandler.compact(rqst);
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    txnHandler.markCompacted(ci);
+
+    toClean = txnHandler.findReadyToClean();
+    assertEquals(1, toClean.size());
+    txnHandler.markCleaned(ci);
+
+    txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
+    txnHandler.cleanEmptyAbortedTxns();
+    txnList = txnHandler.getOpenTxns();
+    assertEquals(3, txnList.getOpen_txnsSize());
+  }
+
+  @Test
+  public void addDynamicPartitions() throws Exception {
+    String dbName = "default";
+    String tableName = "adp_table";
+    OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
+    long txnId = openTxns.getTxn_ids().get(0);
+    // lock a table, as in dynamic partitions
+    LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
+    lc.setTablename(tableName);
+    DataOperationType dop = DataOperationType.UPDATE; 
+    lc.setOperationType(dop);
+    LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
+    lr.setTxnid(txnId);
+    LockResponse lock = txnHandler.lock(lr);
+    assertEquals(LockState.ACQUIRED, lock.getState());
+
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
+      Arrays.asList("ds=yesterday", "ds=today"));
+    adp.setOperationType(dop);
+    txnHandler.addDynamicPartitions(adp);
+    txnHandler.commitTxn(new CommitTxnRequest(txnId));
+
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
+    assertEquals(2, potentials.size());
+    SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
+
+    int i = 0;
+    for (CompactionInfo ci : sorted) {
+      assertEquals(dbName, ci.dbname);
+      assertEquals(tableName, ci.tableName);
+      switch (i++) {
+        case 0: assertEquals("ds=today", ci.partName); break;
+        case 1: assertEquals("ds=yesterday", ci.partName); break;
+      default: throw new RuntimeException("What?");
+      }
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TxnDbUtil.prepDb();
+    txnHandler = TxnUtils.getTxnStore(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TxnDbUtil.cleanDb();
+  }
+
+  private long openTxn() throws MetaException {
+    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
+    return txns.get(0);
+  }
+
+}


[3/3] hive git commit: HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)

Posted by we...@apache.org.
HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/259e8be1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/259e8be1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/259e8be1

Branch: refs/heads/master
Commit: 259e8be1d4486c6a17b8c240e43154c5a839524e
Parents: 360dfa0
Author: Wei Zheng <we...@apache.org>
Authored: Fri May 20 09:50:44 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri May 20 09:50:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    6 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   79 +
 .../hadoop/hive/metastore/txn/TxnStore.java     |    6 +
 .../metastore/txn/TestCompactionTxnHandler.java |  466 ------
 .../hive/metastore/txn/TestTxnHandler.java      | 1484 ------------------
 .../hive/ql/txn/AcidOpenTxnsCounterService.java |   69 +
 .../metastore/txn/TestCompactionTxnHandler.java |  466 ++++++
 .../hive/metastore/txn/TestTxnHandler.java      | 1484 ++++++++++++++++++
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   41 +-
 9 files changed, 2150 insertions(+), 1951 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9cc8fbe..4cfa5f1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1681,6 +1681,12 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
+    HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
+        "current open transactions reach this limit, future open transaction requests will be \n" +
+        "rejected, until this number goes below the limit."),
+    HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s",
+        new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."),
+
     HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000,
         "Maximum number of transactions that can be fetched in one call to open_txns().\n" +
         "This controls how many transactions streaming agents such as Flume or Storm open\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index abaff34..82d685d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,6 +168,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  // Maximum number of open transactions that's allowed
+  private static volatile int maxOpenTxns = 0;
+  // Current number of open txns
+  private static volatile long numOpenTxns = 0;
+  // Whether number of open transactions reaches the threshold
+  private static volatile boolean tooManyOpenTxns = false;
+  // The AcidHouseKeeperService for counting open transactions
+  private static volatile HouseKeeperService openTxnsCounter = null;
+
   /**
    * Number of consecutive deadlocks we have seen
    */
@@ -236,6 +246,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         TimeUnit.MILLISECONDS);
     retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
+    maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
   }
 
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -362,7 +373,45 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return getOpenTxns();
     }
   }
+
+  private static void startHouseKeeperService(HiveConf conf, Class c){
+    try {
+      openTxnsCounter = (HouseKeeperService)c.newInstance();
+      openTxnsCounter.start(conf);
+    } catch (Exception ex) {
+      LOG.error("Failed to start {}" , openTxnsCounter.getClass() +
+              ".  The system will not handle {} " , openTxnsCounter.getServiceDescription(),
+          ".  Root Cause: ", ex);
+    }
+  }
+
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+    if (openTxnsCounter == null) {
+      synchronized (TxnHandler.class) {
+        try {
+          if (openTxnsCounter == null) {
+            startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService"));
+          }
+        } catch (ClassNotFoundException e) {
+          throw new MetaException(e.getMessage());
+        }
+      }
+    }
+
+    if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) {
+      tooManyOpenTxns = true;
+    }
+    if (tooManyOpenTxns) {
+      if (numOpenTxns < maxOpenTxns * 0.9) {
+        tooManyOpenTxns = false;
+      } else {
+        LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
+            "reached. Current number of open transactions: " + numOpenTxns);
+        throw new MetaException("Maximum allowed number of open transactions has been reached. " +
+            "See hive.max.open.txns.");
+      }
+    }
+
     int numTxns = rqst.getNum_txns();
     try {
       Connection dbConn = null;
@@ -2856,6 +2905,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  public void countOpenTxns() throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          LOG.error("Transaction database not properly configured, " +
+              "can't find txn_state from TXNS.");
+        } else {
+          numOpenTxns = rs.getLong(1);
+        }
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        LOG.info("Failed to update number of open transactions");
+        checkRetryable(dbConn, e, "countOpenTxns()");
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      countOpenTxns();
+    }
+  }
+
   private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
     if (connPool != null) return;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 12be862..5b56aaf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -77,6 +77,12 @@ public interface TxnStore {
   public GetOpenTxnsResponse getOpenTxns() throws MetaException;
 
   /**
+   * Get the count for open transactions.
+   * @throws MetaException
+   */
+  public void countOpenTxns() throws MetaException;
+
+  /**
    * Open a set of transactions
    * @param rqst request to open transactions
    * @return information on opened transactions

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
deleted file mode 100644
index f513d0f..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestCompactionTxnHandler {
-
-  private HiveConf conf = new HiveConf();
-  private TxnStore txnHandler;
-
-  public TestCompactionTxnHandler() throws Exception {
-    TxnDbUtil.setConfValues(conf);
-    tearDown();
-  }
-
-  @Test
-  public void testFindNextToCompact() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    long now = System.currentTimeMillis();
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    assertEquals("ds=today", ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-    assertNull(ci.runAs);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    txnHandler.setRunAs(ci.id, "bob");
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("working", c.getState());
-    assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
-    assertEquals("fred", c.getWorkerid());
-    assertEquals("bob", c.getRunAs());
-  }
-
-  @Test
-  public void testFindNextToCompact2() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-
-    rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=yesterday");
-    txnHandler.compact(rqst);
-
-    long now = System.currentTimeMillis();
-    boolean expectToday = false;
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    if ("ds=today".equals(ci.partName)) expectToday = false;
-    else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
-    else fail("partition name should have been today or yesterday but was " + ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    if (expectToday) assertEquals("ds=today", ci.partName);
-    else assertEquals("ds=yesterday", ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(2, compacts.size());
-    for (ShowCompactResponseElement e : compacts) {
-      assertEquals("working", e.getState());
-      assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
-      assertEquals("fred", e.getWorkerid());
-    }
-  }
-
-  @Test
-  public void testFindNextToCompactNothingToCompact() throws Exception {
-    assertNull(txnHandler.findNextToCompact("fred"));
-  }
-
-  @Test
-  public void testMarkCompacted() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("ready for cleaning", c.getState());
-    assertNull(c.getWorkerid());
-  }
-
-  @Test
-  public void testFindNextToClean() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("ready for cleaning", c.getState());
-    assertNull(c.getWorkerid());
-  }
-
-  @Test
-  public void testMarkCleaned() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    assertNull(txnHandler.findNextToCompact("fred"));
-    txnHandler.markCleaned(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-    assertEquals(0, txnHandler.findReadyToClean().size());
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    assertEquals(1, rsp.getCompactsSize());
-    assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
-  }
-
-  @Test
-  public void testRevokeFromLocalWorkers() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    assertNotNull(txnHandler.findNextToCompact("bob-193892"));
-    assertNotNull(txnHandler.findNextToCompact("fred-193893"));
-    txnHandler.revokeFromLocalWorkers("fred");
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(3, compacts.size());
-    boolean sawWorkingBob = false;
-    int initiatedCount = 0;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working")) {
-        assertEquals("bob-193892", c.getWorkerid());
-        sawWorkingBob = true;
-      } else if (c.getState().equals("initiated")) {
-        initiatedCount++;
-      } else {
-        fail("Unexpected state");
-      }
-    }
-    assertTrue(sawWorkingBob);
-    assertEquals(2, initiatedCount);
-  }
-
-  @Test
-  public void testRevokeTimedOutWorkers() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    Thread.sleep(200);
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    txnHandler.revokeTimedoutWorkers(100);
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(2, compacts.size());
-    boolean sawWorking = false, sawInitiated = false;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working"))  sawWorking = true;
-      else if (c.getState().equals("initiated")) sawInitiated = true;
-      else fail("Unexpected state");
-    }
-    assertTrue(sawWorking);
-    assertTrue(sawInitiated);
-  }
-
-  @Test
-  public void testFindPotentialCompactions() throws Exception {
-    // Test that committing unlocks
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("yourtable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.commitTxn(new CommitTxnRequest(txnid));
-    assertEquals(0, txnHandler.numLocksInLockTable());
-
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
-    assertEquals(2, potentials.size());
-    boolean sawMyTable = false, sawYourTable = false;
-    for (CompactionInfo ci : potentials) {
-      sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
-          ci.partName ==  null);
-      sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
-          ci.partName.equals("mypartition"));
-    }
-    assertTrue(sawMyTable);
-    assertTrue(sawYourTable);
-  }
-
-  // TODO test changes to mark cleaned to clean txns and txn_components
-
-  @Test
-  public void testMarkCleanedCleansTxnsAndTxnComponents()
-      throws Exception {
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    txnid = openTxn();
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("yourtable");
-    comp.setOperationType(DataOperationType.DELETE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    txnid = openTxn();
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("foo");
-    comp.setPartitionname("bar");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("foo");
-    comp.setPartitionname("baz");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    CompactionInfo ci = new CompactionInfo();
-
-    // Now clean them and check that they are removed from the count.
-    CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    txnHandler.markCompacted(ci);
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
-
-    // Check that we are cleaning up the empty aborted transactions
-    GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
-    assertEquals(3, txnList.getOpen_txnsSize());
-    txnHandler.cleanEmptyAbortedTxns();
-    txnList = txnHandler.getOpenTxns();
-    assertEquals(2, txnList.getOpen_txnsSize());
-
-    rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
-    rqst.setPartitionname("bar");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    txnHandler.markCompacted(ci);
-
-    toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
-
-    txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
-    txnHandler.cleanEmptyAbortedTxns();
-    txnList = txnHandler.getOpenTxns();
-    assertEquals(3, txnList.getOpen_txnsSize());
-  }
-
-  @Test
-  public void addDynamicPartitions() throws Exception {
-    String dbName = "default";
-    String tableName = "adp_table";
-    OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
-    long txnId = openTxns.getTxn_ids().get(0);
-    // lock a table, as in dynamic partitions
-    LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
-    lc.setTablename(tableName);
-    DataOperationType dop = DataOperationType.UPDATE; 
-    lc.setOperationType(dop);
-    LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
-    lr.setTxnid(txnId);
-    LockResponse lock = txnHandler.lock(lr);
-    assertEquals(LockState.ACQUIRED, lock.getState());
-
-    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
-      Arrays.asList("ds=yesterday", "ds=today"));
-    adp.setOperationType(dop);
-    txnHandler.addDynamicPartitions(adp);
-    txnHandler.commitTxn(new CommitTxnRequest(txnId));
-
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
-    assertEquals(2, potentials.size());
-    SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
-
-    int i = 0;
-    for (CompactionInfo ci : sorted) {
-      assertEquals(dbName, ci.dbname);
-      assertEquals(tableName, ci.tableName);
-      switch (i++) {
-        case 0: assertEquals("ds=today", ci.partName); break;
-        case 1: assertEquals("ds=yesterday", ci.partName); break;
-      default: throw new RuntimeException("What?");
-      }
-    }
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    TxnDbUtil.prepDb();
-    txnHandler = TxnUtils.getTxnStore(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    TxnDbUtil.cleanDb();
-  }
-
-  private long openTxn() throws MetaException {
-    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
-    return txns.get(0);
-  }
-
-}