You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/20 16:51:00 UTC

[3/3] hive git commit: HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)

HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/259e8be1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/259e8be1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/259e8be1

Branch: refs/heads/master
Commit: 259e8be1d4486c6a17b8c240e43154c5a839524e
Parents: 360dfa0
Author: Wei Zheng <we...@apache.org>
Authored: Fri May 20 09:50:44 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri May 20 09:50:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    6 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   79 +
 .../hadoop/hive/metastore/txn/TxnStore.java     |    6 +
 .../metastore/txn/TestCompactionTxnHandler.java |  466 ------
 .../hive/metastore/txn/TestTxnHandler.java      | 1484 ------------------
 .../hive/ql/txn/AcidOpenTxnsCounterService.java |   69 +
 .../metastore/txn/TestCompactionTxnHandler.java |  466 ++++++
 .../hive/metastore/txn/TestTxnHandler.java      | 1484 ++++++++++++++++++
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   41 +-
 9 files changed, 2150 insertions(+), 1951 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9cc8fbe..4cfa5f1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1681,6 +1681,12 @@ public class HiveConf extends Configuration {
         " of the lock manager is dumped to log file.  This is for debugging.  See also " +
         "hive.lock.numretries and hive.lock.sleep.between.retries."),
 
+    HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
+        "current open transactions reach this limit, future open transaction requests will be \n" +
+        "rejected, until this number goes below the limit."),
+    HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s",
+        new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."),
+
     HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000,
         "Maximum number of transactions that can be fetched in one call to open_txns().\n" +
         "This controls how many transactions streaming agents such as Flume or Storm open\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index abaff34..82d685d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,6 +168,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  // Maximum number of open transactions that's allowed
+  private static volatile int maxOpenTxns = 0;
+  // Current number of open txns
+  private static volatile long numOpenTxns = 0;
+  // Whether number of open transactions reaches the threshold
+  private static volatile boolean tooManyOpenTxns = false;
+  // The AcidHouseKeeperService for counting open transactions
+  private static volatile HouseKeeperService openTxnsCounter = null;
+
   /**
    * Number of consecutive deadlocks we have seen
    */
@@ -236,6 +246,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         TimeUnit.MILLISECONDS);
     retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
+    maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
   }
 
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -362,7 +373,45 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return getOpenTxns();
     }
   }
+
+  private static void startHouseKeeperService(HiveConf conf, Class c){
+    try {
+      openTxnsCounter = (HouseKeeperService)c.newInstance();
+      openTxnsCounter.start(conf);
+    } catch (Exception ex) {
+      LOG.error("Failed to start {}" , openTxnsCounter.getClass() +
+              ".  The system will not handle {} " , openTxnsCounter.getServiceDescription(),
+          ".  Root Cause: ", ex);
+    }
+  }
+
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+    if (openTxnsCounter == null) {
+      synchronized (TxnHandler.class) {
+        try {
+          if (openTxnsCounter == null) {
+            startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService"));
+          }
+        } catch (ClassNotFoundException e) {
+          throw new MetaException(e.getMessage());
+        }
+      }
+    }
+
+    if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) {
+      tooManyOpenTxns = true;
+    }
+    if (tooManyOpenTxns) {
+      if (numOpenTxns < maxOpenTxns * 0.9) {
+        tooManyOpenTxns = false;
+      } else {
+        LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
+            "reached. Current number of open transactions: " + numOpenTxns);
+        throw new MetaException("Maximum allowed number of open transactions has been reached. " +
+            "See hive.max.open.txns.");
+      }
+    }
+
     int numTxns = rqst.getNum_txns();
     try {
       Connection dbConn = null;
@@ -2856,6 +2905,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  public void countOpenTxns() throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          LOG.error("Transaction database not properly configured, " +
+              "can't find txn_state from TXNS.");
+        } else {
+          numOpenTxns = rs.getLong(1);
+        }
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        LOG.info("Failed to update number of open transactions");
+        checkRetryable(dbConn, e, "countOpenTxns()");
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      countOpenTxns();
+    }
+  }
+
   private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
     if (connPool != null) return;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 12be862..5b56aaf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -77,6 +77,12 @@ public interface TxnStore {
   public GetOpenTxnsResponse getOpenTxns() throws MetaException;
 
   /**
+   * Get the count for open transactions.
+   * @throws MetaException
+   */
+  public void countOpenTxns() throws MetaException;
+
+  /**
    * Open a set of transactions
    * @param rqst request to open transactions
    * @return information on opened transactions

http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
deleted file mode 100644
index f513d0f..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestCompactionTxnHandler {
-
-  private HiveConf conf = new HiveConf();
-  private TxnStore txnHandler;
-
-  public TestCompactionTxnHandler() throws Exception {
-    TxnDbUtil.setConfValues(conf);
-    tearDown();
-  }
-
-  @Test
-  public void testFindNextToCompact() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    long now = System.currentTimeMillis();
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    assertEquals("ds=today", ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-    assertNull(ci.runAs);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    txnHandler.setRunAs(ci.id, "bob");
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("working", c.getState());
-    assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
-    assertEquals("fred", c.getWorkerid());
-    assertEquals("bob", c.getRunAs());
-  }
-
-  @Test
-  public void testFindNextToCompact2() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-
-    rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=yesterday");
-    txnHandler.compact(rqst);
-
-    long now = System.currentTimeMillis();
-    boolean expectToday = false;
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    if ("ds=today".equals(ci.partName)) expectToday = false;
-    else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
-    else fail("partition name should have been today or yesterday but was " + ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    assertEquals("foo", ci.dbname);
-    assertEquals("bar", ci.tableName);
-    if (expectToday) assertEquals("ds=today", ci.partName);
-    else assertEquals("ds=yesterday", ci.partName);
-    assertEquals(CompactionType.MINOR, ci.type);
-
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(2, compacts.size());
-    for (ShowCompactResponseElement e : compacts) {
-      assertEquals("working", e.getState());
-      assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
-      assertEquals("fred", e.getWorkerid());
-    }
-  }
-
-  @Test
-  public void testFindNextToCompactNothingToCompact() throws Exception {
-    assertNull(txnHandler.findNextToCompact("fred"));
-  }
-
-  @Test
-  public void testMarkCompacted() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("ready for cleaning", c.getState());
-    assertNull(c.getWorkerid());
-  }
-
-  @Test
-  public void testFindNextToClean() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(1, compacts.size());
-    ShowCompactResponseElement c = compacts.get(0);
-    assertEquals("foo", c.getDbname());
-    assertEquals("bar", c.getTablename());
-    assertEquals("ds=today", c.getPartitionname());
-    assertEquals(CompactionType.MINOR, c.getType());
-    assertEquals("ready for cleaning", c.getState());
-    assertNull(c.getWorkerid());
-  }
-
-  @Test
-  public void testMarkCleaned() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    rqst.setPartitionname("ds=today");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    CompactionInfo ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    txnHandler.markCompacted(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    assertNull(txnHandler.findNextToCompact("fred"));
-    txnHandler.markCleaned(ci);
-    assertNull(txnHandler.findNextToCompact("fred"));
-    assertEquals(0, txnHandler.findReadyToClean().size());
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    assertEquals(1, rsp.getCompactsSize());
-    assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
-  }
-
-  @Test
-  public void testRevokeFromLocalWorkers() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    assertNotNull(txnHandler.findNextToCompact("bob-193892"));
-    assertNotNull(txnHandler.findNextToCompact("fred-193893"));
-    txnHandler.revokeFromLocalWorkers("fred");
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(3, compacts.size());
-    boolean sawWorkingBob = false;
-    int initiatedCount = 0;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working")) {
-        assertEquals("bob-193892", c.getWorkerid());
-        sawWorkingBob = true;
-      } else if (c.getState().equals("initiated")) {
-        initiatedCount++;
-      } else {
-        fail("Unexpected state");
-      }
-    }
-    assertTrue(sawWorkingBob);
-    assertEquals(2, initiatedCount);
-  }
-
-  @Test
-  public void testRevokeTimedOutWorkers() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    Thread.sleep(200);
-    assertNotNull(txnHandler.findNextToCompact("fred-193892"));
-    txnHandler.revokeTimedoutWorkers(100);
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(2, compacts.size());
-    boolean sawWorking = false, sawInitiated = false;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working"))  sawWorking = true;
-      else if (c.getState().equals("initiated")) sawInitiated = true;
-      else fail("Unexpected state");
-    }
-    assertTrue(sawWorking);
-    assertTrue(sawInitiated);
-  }
-
-  @Test
-  public void testFindPotentialCompactions() throws Exception {
-    // Test that committing unlocks
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.UPDATE);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("yourtable");
-    comp.setPartitionname("mypartition");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.commitTxn(new CommitTxnRequest(txnid));
-    assertEquals(0, txnHandler.numLocksInLockTable());
-
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
-    assertEquals(2, potentials.size());
-    boolean sawMyTable = false, sawYourTable = false;
-    for (CompactionInfo ci : potentials) {
-      sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
-          ci.partName ==  null);
-      sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
-          ci.partName.equals("mypartition"));
-    }
-    assertTrue(sawMyTable);
-    assertTrue(sawYourTable);
-  }
-
-  // TODO test changes to mark cleaned to clean txns and txn_components
-
-  @Test
-  public void testMarkCleanedCleansTxnsAndTxnComponents()
-      throws Exception {
-    long txnid = openTxn();
-    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
-        "mydb");
-    comp.setTablename("mytable");
-    comp.setOperationType(DataOperationType.INSERT);
-    List<LockComponent> components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    LockResponse res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    txnid = openTxn();
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("yourtable");
-    comp.setOperationType(DataOperationType.DELETE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    txnid = openTxn();
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("foo");
-    comp.setPartitionname("bar");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
-    comp.setTablename("foo");
-    comp.setPartitionname("baz");
-    comp.setOperationType(DataOperationType.UPDATE);
-    components = new ArrayList<LockComponent>(1);
-    components.add(comp);
-    req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
-    res = txnHandler.lock(req);
-    assertTrue(res.getState() == LockState.ACQUIRED);
-    txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
-    CompactionInfo ci = new CompactionInfo();
-
-    // Now clean them and check that they are removed from the count.
-    CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    txnHandler.markCompacted(ci);
-
-    List<CompactionInfo> toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
-
-    // Check that we are cleaning up the empty aborted transactions
-    GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
-    assertEquals(3, txnList.getOpen_txnsSize());
-    txnHandler.cleanEmptyAbortedTxns();
-    txnList = txnHandler.getOpenTxns();
-    assertEquals(2, txnList.getOpen_txnsSize());
-
-    rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
-    rqst.setPartitionname("bar");
-    txnHandler.compact(rqst);
-    assertEquals(0, txnHandler.findReadyToClean().size());
-    ci = txnHandler.findNextToCompact("fred");
-    assertNotNull(ci);
-    txnHandler.markCompacted(ci);
-
-    toClean = txnHandler.findReadyToClean();
-    assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
-
-    txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
-    txnHandler.cleanEmptyAbortedTxns();
-    txnList = txnHandler.getOpenTxns();
-    assertEquals(3, txnList.getOpen_txnsSize());
-  }
-
-  @Test
-  public void addDynamicPartitions() throws Exception {
-    String dbName = "default";
-    String tableName = "adp_table";
-    OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
-    long txnId = openTxns.getTxn_ids().get(0);
-    // lock a table, as in dynamic partitions
-    LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
-    lc.setTablename(tableName);
-    DataOperationType dop = DataOperationType.UPDATE; 
-    lc.setOperationType(dop);
-    LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
-    lr.setTxnid(txnId);
-    LockResponse lock = txnHandler.lock(lr);
-    assertEquals(LockState.ACQUIRED, lock.getState());
-
-    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
-      Arrays.asList("ds=yesterday", "ds=today"));
-    adp.setOperationType(dop);
-    txnHandler.addDynamicPartitions(adp);
-    txnHandler.commitTxn(new CommitTxnRequest(txnId));
-
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
-    assertEquals(2, potentials.size());
-    SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
-
-    int i = 0;
-    for (CompactionInfo ci : sorted) {
-      assertEquals(dbName, ci.dbname);
-      assertEquals(tableName, ci.tableName);
-      switch (i++) {
-        case 0: assertEquals("ds=today", ci.partName); break;
-        case 1: assertEquals("ds=yesterday", ci.partName); break;
-      default: throw new RuntimeException("What?");
-      }
-    }
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    TxnDbUtil.prepDb();
-    txnHandler = TxnUtils.getTxnStore(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    TxnDbUtil.cleanDb();
-  }
-
-  private long openTxn() throws MetaException {
-    List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
-    return txns.get(0);
-  }
-
-}