You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/03/25 00:28:27 UTC

[1/3] hive git commit: HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/branch-1 db2efe42a -> 178708231


http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 5545574..cac4623 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -20,15 +20,31 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreThread;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,7 +55,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 import org.apache.thrift.TException;
 
@@ -62,7 +82,7 @@ public abstract class CompactorTest {
   static final private String CLASS_NAME = CompactorTest.class.getName();
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
-  protected CompactionTxnHandler txnHandler;
+  protected TxnStore txnHandler;
   protected IMetaStoreClient ms;
   protected long sleepTime = 1000;
   protected HiveConf conf;
@@ -75,7 +95,7 @@ public abstract class CompactorTest {
     TxnDbUtil.setConfValues(conf);
     TxnDbUtil.cleanDb();
     ms = new HiveMetaStoreClient(conf);
-    txnHandler = new CompactionTxnHandler(conf);
+    txnHandler = TxnUtils.getTxnStore(conf);
     tmpdir = new File(System.getProperty("java.io.tmpdir") +
         System.getProperty("file.separator") + "compactor_test_tables");
     tmpdir.mkdir();

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 913c8bc..17634f0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -25,6 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -73,7 +73,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -105,7 +105,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -135,7 +135,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -174,7 +174,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -329,7 +329,7 @@ public class TestCleaner extends CompactorTest {
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -403,7 +403,7 @@ public class TestCleaner extends CompactorTest {
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -429,7 +429,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
 
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, p);
@@ -460,7 +460,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 
   @Test
@@ -488,7 +488,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
   @Override
   boolean useHive130DeltaDirName() {

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 07f477d..f84bd7e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -205,12 +204,12 @@ public class TestInitiator extends CompactorTest {
     LockResponse res = txnHandler.lock(req);
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
 
-    for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE  + 50; i++) {
+    for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE  + 50; i++) {
       txnid = openTxn();
       txnHandler.abortTxn(new AbortTxnRequest(txnid));
     }
     GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
-    Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
+    Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
 
     startInitiator();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 8862402..381eeb3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -22,13 +22,18 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -935,7 +940,7 @@ public class TestWorker extends CompactorTest {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(compacts.get(0).getState()));
   }
 
   @Test
@@ -960,6 +965,6 @@ public class TestWorker extends CompactorTest {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Assert.assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
+    Assert.assertTrue(TxnStore.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
   }
 }


[3/3] hive git commit: HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-11388 - Allow ACID Compactor components to run in multiple metastores (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17870823
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17870823
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17870823

Branch: refs/heads/branch-1
Commit: 178708231e09bb2c08aa05cf9979efd6d3cd542c
Parents: c829505
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:22:21 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:22:21 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/ServerUtils.java  |  14 ++
 .../deployers/config/hive/hive-site.mysql.xml   |  24 ++-
 .../hive/metastore/txn/CompactionInfo.java      |   4 +
 .../metastore/txn/CompactionTxnHandler.java     |   7 +-
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |  20 ++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 169 ++++++++++++++++++-
 .../hadoop/hive/metastore/txn/TxnStore.java     |  33 +++-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   4 +-
 .../metastore/txn/ValidCompactorTxnList.java    |   2 +-
 .../hive/metastore/txn/TestTxnHandler.java      |  93 ++++++++++
 .../ql/txn/AcidCompactionHistoryService.java    |   7 +
 .../hive/ql/txn/AcidHouseKeeperService.java     |   7 +
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  62 ++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  19 ++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   7 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   6 +
 16 files changed, 445 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index a284f18..4141770 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -24,6 +24,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 /**
  * ServerUtils (specific to HiveServer version 1)
  */
@@ -47,4 +50,15 @@ public class ServerUtils {
     }
   }
 
+  /**
+   * @return name of current host
+   */
+  public static String hostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Unable to resolve my host name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
----------------------------------------------------------------------
diff --git a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
index b6f1ab7..387da6c 100644
--- a/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
+++ b/hcatalog/src/test/e2e/templeton/deployers/config/hive/hive-site.mysql.xml
@@ -62,13 +62,14 @@
         <name>hive.exec.dynamic.partition.mode</name>
         <value>nonstrict</value>
     </property>
+
     <property>
         <name>hive.compactor.initiator.on</name>
-        <value>false</value>
+        <value>true</value>
     </property>
     <property>
         <name>hive.compactor.worker.threads</name>
-        <value>2</value>
+        <value>5</value>
     </property>
     <property>
         <name>hive.timedout.txn.reaper.start</name>
@@ -81,9 +82,24 @@
     -->
     <property>
         <name>hive.timedout.txn.reaper.interval</name>
-        <value>30s</value>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.history.reaper.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.cleaner.run.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.check.interval</name>
+        <value>1s</value>
+    </property>
+    <property>
+        <name>hive.compactor.delta.num.threshold</name>
+        <value>2</value>
     </property>
-
     <!--end ACID related properties-->
 <!--
     <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 73255d2..bea1473 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -39,6 +40,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   public boolean tooManyAborts = false;
   /**
    * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) 
+   * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+   * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+   * {@link ValidCompactorTxnList#highWatermark}
    */
   public long highestTxnId;
   byte[] metaInfo;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index f7c738a..cdff357 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -161,6 +161,8 @@ class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+      Statement updStmt = null;
       ResultSet rs = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -174,6 +176,7 @@ class CompactionTxnHandler extends TxnHandler {
           dbConn.rollback();
           return null;
         }
+        updStmt = dbConn.createStatement();
         do {
           CompactionInfo info = new CompactionInfo();
           info.id = rs.getLong(1);
@@ -187,7 +190,7 @@ class CompactionTxnHandler extends TxnHandler {
             "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
             " AND cq_state='" + INITIATED_STATE + "'";
           LOG.debug("Going to execute update <" + s + ">");
-          int updCount = stmt.executeUpdate(s);
+          int updCount = updStmt.executeUpdate(s);
           if(updCount == 1) {
             dbConn.commit();
             return info;
@@ -211,6 +214,7 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        closeStmt(updStmt);
         close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
@@ -627,6 +631,7 @@ class CompactionTxnHandler extends TxnHandler {
 
   /**
    * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+   * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
    */
   public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
     Connection dbConn = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 42415ac..56c9ed8 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -68,7 +68,7 @@ public final class TxnDbUtil {
     Connection conn = null;
     Statement stmt = null;
     try {
-      conn = getConnection();
+      conn = getConnection(true);
       stmt = conn.createStatement();
       stmt.execute("CREATE TABLE TXNS (" +
           "  TXN_ID bigint PRIMARY KEY," +
@@ -140,8 +140,13 @@ public final class TxnDbUtil {
         " CC_HIGHEST_TXN_ID bigint," +
         " CC_META_INFO varchar(2048) for bit data," +
         " CC_HADOOP_JOB_ID varchar(32))");
-
-      conn.commit();
+      
+      stmt.execute("CREATE TABLE AUX_TABLE (" +
+        "  MT_KEY1 varchar(128) NOT NULL," +
+        "  MT_KEY2 bigint NOT NULL," +
+        "  MT_COMMENT varchar(255)," +
+        "  PRIMARY KEY(MT_KEY1, MT_KEY2)" +
+        ")");
     } catch (SQLException e) {
       try {
         conn.rollback();
@@ -166,7 +171,7 @@ public final class TxnDbUtil {
     Connection conn = null;
     Statement stmt = null;
     try {
-      conn = getConnection();
+      conn = getConnection(true);
       stmt = conn.createStatement();
 
       // We want to try these, whether they succeed or fail.
@@ -185,7 +190,7 @@ public final class TxnDbUtil {
       dropTable(stmt, "COMPACTION_QUEUE");
       dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
       dropTable(stmt, "COMPLETED_COMPACTIONS");
-      conn.commit();
+      dropTable(stmt, "AUX_TABLE");
     } finally {
       closeResources(conn, stmt, null);
     }
@@ -249,6 +254,9 @@ public final class TxnDbUtil {
   }
 
   static Connection getConnection() throws Exception {
+    return getConnection(false);
+  }
+  static Connection getConnection(boolean isAutoCommit) throws Exception {
     HiveConf conf = new HiveConf();
     String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
     Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
@@ -260,7 +268,7 @@ public final class TxnDbUtil {
     prop.setProperty("user", user);
     prop.setProperty("password", passwd);
     Connection conn = driver.connect(driverUrl, prop);
-    conn.setAutoCommit(false);
+    conn.setAutoCommit(isAutoCommit);
     return conn;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9789371..a3b0751 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -23,6 +23,8 @@ import com.jolbox.bonecp.BoneCPDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
 import org.apache.commons.dbcp.DriverManagerConnectionFactory;
 import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.commons.logging.Log;
@@ -45,6 +47,8 @@ import javax.sql.DataSource;
 import java.io.IOException;
 import java.sql.*;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -87,7 +91,7 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore {
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   static final protected char INITIATED_STATE = 'i';
   static final protected char WORKING_STATE = 'w';
@@ -139,6 +143,12 @@ abstract class TxnHandler implements TxnStore {
    * Derby specific concurrency control
    */
   private static final ReentrantLock derbyLock = new ReentrantLock(true);
+  /**
+   * must be static since even in UT there may be > 1 instance of TxnHandler
+   * (e.g. via Compactor services)
+   */
+  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+  private static final String hostname = ServerUtils.hostname();
 
   // Private methods should never catch SQLException and then throw MetaException.  The public
   // methods depend on SQLException coming back so they can detect and handle deadlocks.  Private
@@ -563,7 +573,7 @@ abstract class TxnHandler implements TxnStore {
    * @throws MetaException
    */
   private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
-    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : "");
+    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
     ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
     if(rs.next()) {
       return rs;
@@ -1437,14 +1447,14 @@ abstract class TxnHandler implements TxnStore {
     }
   }
 
-  void rollbackDBConn(Connection dbConn) {
+  static void rollbackDBConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
     } catch (SQLException e) {
       LOG.warn("Failed to rollback db connection " + getMessage(e));
     }
   }
-  protected void closeDbConn(Connection dbConn) {
+  protected static void closeDbConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) {
         dbConn.close();
@@ -1458,7 +1468,7 @@ abstract class TxnHandler implements TxnStore {
    * Close statement instance.
    * @param stmt statement instance.
    */
-  protected void closeStmt(Statement stmt) {
+  protected static void closeStmt(Statement stmt) {
     try {
       if (stmt != null && !stmt.isClosed()) stmt.close();
     } catch (SQLException e) {
@@ -1470,7 +1480,7 @@ abstract class TxnHandler implements TxnStore {
    * Close the ResultSet.
    * @param rs may be {@code null}
    */
-  void close(ResultSet rs) {
+  static void close(ResultSet rs) {
     try {
       if (rs != null && !rs.isClosed()) {
         rs.close();
@@ -1484,7 +1494,7 @@ abstract class TxnHandler implements TxnStore {
   /**
    * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
    */
-  void close(ResultSet rs, Statement stmt, Connection dbConn) {
+  static void close(ResultSet rs, Statement stmt, Connection dbConn) {
     close(rs);
     closeStmt(stmt);
     closeDbConn(dbConn);
@@ -2635,6 +2645,40 @@ abstract class TxnHandler implements TxnStore {
     }
     return false;
   }
+  private boolean isDuplicateKeyError(SQLException ex) {
+    switch (dbProduct) {
+      case DERBY:
+        if("23505".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case MYSQL:
+        if(ex.getErrorCode() == 1022 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case SQLSERVER:
+        //2627 is unique constaint violation incl PK, 2601 - unique key
+        if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case ORACLE:
+        if(ex.getErrorCode() == 1 && "23000".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      case POSTGRES:
+        //http://www.postgresql.org/docs/8.1/static/errcodes-appendix.html
+        if("23505".equals(ex.getSQLState())) {
+          return true;
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected DB type: " + dbProduct + "; " + getMessage(ex));
+    }
+    return false;
+  }
   private static String getMessage(SQLException ex) {
     return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
   }
@@ -2709,4 +2753,115 @@ abstract class TxnHandler implements TxnStore {
       derbyLock.unlock();
     }
   }
+  @Override
+  public MutexAPI getMutexAPI() {
+    return this;
+  }
+
+  @Override
+  public LockHandle acquireLock(String key) throws MetaException {
+    /**
+     * The implementation here is a bit kludgey but done so that code exercised by unit tests
+     * (which run against Derby which has no support for select for update) is as similar to
+     * production code as possible.
+     * In particular, with Derby we always run in a single process with a single metastore and
+     * the absence of For Update is handled via a Semaphore.  The later would strictly speaking
+     * make the SQL statments below unnecessary (for Derby), but then they would not be tested.
+     */
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        String sqlStmt = addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("About to execute SQL: " + sqlStmt);
+        }
+        rs = stmt.executeQuery(sqlStmt);
+        if (!rs.next()) {
+          close(rs);
+          try {
+            stmt.executeUpdate("insert into AUX_TABLE(MT_KEY1,MT_KEY2) values(" + quoteString(key) + ", 0)");
+            dbConn.commit();
+          } catch (SQLException ex) {
+            if (!isDuplicateKeyError(ex)) {
+              throw new RuntimeException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex), ex);
+            }
+          }
+          rs = stmt.executeQuery(sqlStmt);
+          if (!rs.next()) {
+            throw new IllegalStateException("Unable to lock " + quoteString(key) + ".  Expected row in AUX_TABLE is missing.");
+          }
+        }
+        Semaphore derbySemaphore = null;
+        if(dbProduct == DatabaseProduct.DERBY) {
+          derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
+          derbySemaphore =  derbyKey2Lock.get(key);
+          derbySemaphore.acquire();
+        }
+        LOG.info(quoteString(key) + " locked by " + quoteString(TxnHandler.hostname));
+        //OK, so now we have a lock
+        return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+      } catch (SQLException ex) {
+        rollbackDBConn(dbConn);
+        close(rs, stmt, dbConn);
+        checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+        throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
+      }
+      catch(InterruptedException ex) {
+        rollbackDBConn(dbConn);
+        close(rs, stmt, dbConn);
+        throw new MetaException("Unable to lock " + quoteString(key) + " due to: " + ex.getMessage() + StringUtils.stringifyException(ex));
+      }
+      finally {
+        unlockInternal();
+      }
+    }
+    catch(RetryException ex) {
+      acquireLock(key);
+    }
+    throw new MetaException("This can't happen because checkRetryable() has a retry limit");
+  }
+  public void acquireLock(String key, LockHandle handle) {
+    //the idea is that this will use LockHandle.dbConn
+    throw new NotImplementedException();
+  }
+  private static final class LockHandleImpl implements LockHandle {
+    private final Connection dbConn;
+    private final Statement stmt;
+    private final ResultSet rs;
+    private final Semaphore derbySemaphore;
+    private final List<String> keys = new ArrayList<>();
+    LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, Semaphore derbySemaphore) {
+      this.dbConn = conn;
+      this.stmt = stmt;
+      this.rs = rs;
+      this.derbySemaphore = derbySemaphore;
+      if(derbySemaphore != null) {
+        //oterwise it may later release permit acquired by someone else
+        assert derbySemaphore.availablePermits() == 0 : "Expected locked Semaphore";
+      }
+      keys.add(key);
+    }
+    void addKey(String key) {
+      //keys.add(key);
+      //would need a list of (stmt,rs) pairs - 1 for each key
+      throw new NotImplementedException();
+    }
+    
+    @Override
+    public void releaseLocks() {
+      rollbackDBConn(dbConn);
+      close(rs, stmt, dbConn);
+      if(derbySemaphore != null) {
+        derbySemaphore.release();
+      }
+      for(String key : keys) {
+        LOG.info(quoteString(key) + " unlocked by " + quoteString(TxnHandler.hostname));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6fc6ed9..3aac11b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -74,6 +74,7 @@ import java.util.Set;
 @InterfaceStability.Evolving
 public interface TxnStore {
 
+  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
   // Compactor states (Should really be enum)
   static final public String INITIATED_RESPONSE = "initiated";
   static final public String WORKING_RESPONSE = "working";
@@ -355,10 +356,40 @@ public interface TxnStore {
    */
   public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
 
-
   @VisibleForTesting
   public int numLocksInLockTable() throws SQLException, MetaException;
 
   @VisibleForTesting
   long setTimeout(long milliseconds);
+
+  public MutexAPI getMutexAPI();
+
+  /**
+   * This is primarily designed to provide coarse grained mutex support to operations running
+   * inside the Metastore (of which there could be several instances).  The initial goal is to 
+   * ensure that various sub-processes of the Compactor don't step on each other.
+   * 
+   * In RDMBS world each {@code LockHandle} uses a java.sql.Connection so use it sparingly.
+   */
+  public static interface MutexAPI {
+    /**
+     * The {@code key} is name of the lock. Will acquire and exclusive lock or block.  It retuns
+     * a handle which must be used to release the lock.  Each invocation returns a new handle.
+     */
+    public LockHandle acquireLock(String key) throws MetaException;
+
+    /**
+     * Same as {@link #acquireLock(String)} but takes an already existing handle as input.  This 
+     * will associate the lock on {@code key} with the same handle.  All locks associated with
+     * the same handle will be released together.
+     * @param handle not NULL
+     */
+    public void acquireLock(String key, LockHandle handle) throws MetaException;
+    public static interface LockHandle {
+      /**
+       * Releases all locks associcated with this handle.
+       */
+      public void releaseLocks();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index f60e34b..4c14eef 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -75,8 +75,8 @@ public class TxnUtils {
     int i = 0;
     for (TxnInfo txn : txns.getOpen_txns()) {
       if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
-      exceptions[i++] = txn.getId();
-    }
+      exceptions[i++] = txn.getId();//todo: only add Aborted
+    }//remove all exceptions < minOpenTxn
     highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
     return new ValidCompactorTxnList(exceptions, -1, highWater);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
index 648fd49..30bdfa7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import java.util.Arrays;
 
 /**
- * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
+ * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
  * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it
  * is committed or aborted.  Additionally it will return none if there are any open transactions
  * below the max transaction given, since we don't want to compact above open transactions.  For

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index b8cab71..6033c15 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.junit.*;
+import org.apache.hadoop.util.StringUtils;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -33,6 +34,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
@@ -1214,6 +1216,97 @@ public class TestTxnHandler {
     }
   }
 
+  /**
+   * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
+   * 1. add to metastore/pom.xml
+   *     <dependency>
+   *      <groupId>mysql</groupId>
+   *      <artifactId>mysql-connector-java</artifactId>
+   *      <version>5.1.30</version>
+   *     </dependency>
+   * 2. Hack in the c'tor of this class
+   *     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
+   *      conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+   * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
+   *      
+   */
+  @Ignore("multiple threads wedge Derby")
+  @Test
+  public void testMutexAPI() throws Exception {
+    final TxnStore.MutexAPI api =  txnHandler.getMutexAPI();
+    final AtomicInteger stepTracker = new AtomicInteger(0);
+    /**
+     * counter = 0;
+     * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock 
+     * Thread2 counter=2, lock (and block), inc counter, should be 4
+     */
+    Thread t1 = new Thread("MutexTest1") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 1
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          Thread.sleep(4000);
+          //stepTracker should now be 2 which indicates t2 has started
+          Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
+          stepTracker.incrementAndGet();//now 3
+          handle.releaseLocks();
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t1.setDaemon(true);
+    ErrorHandle ueh1 = new ErrorHandle();
+    t1.setUncaughtExceptionHandler(ueh1);
+    Thread t2 = new Thread("MutexTest2") {
+      public void run() {
+        try {
+          stepTracker.incrementAndGet();//now 2
+          //this should block until t1 unlocks
+          TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
+          stepTracker.incrementAndGet();//now 4
+          Assert.assertEquals(4, stepTracker.get());
+          handle.releaseLocks();
+          stepTracker.incrementAndGet();//now 5
+        }
+        catch(Exception ex) {
+          throw new RuntimeException(ex.getMessage(), ex);
+        }
+      }
+    };
+    t2.setDaemon(true);
+    ErrorHandle ueh2 = new ErrorHandle();
+    t2.setUncaughtExceptionHandler(ueh2);
+    t1.start();
+    try {
+      Thread.sleep(1000);
+    }
+    catch(InterruptedException ex) {
+      LOG.info("Sleep was interrupted");
+    }
+    t2.start();
+    t1.join(6000);//so that test doesn't block
+    t2.join(6000);
+
+    if(ueh1.error != null) {
+      Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
+    }
+    if (ueh2.error != null) {
+      Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
+    }
+    Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
+  }
+  private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
+    Throwable error = null;
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
+      error = e;
+    }
+  }
   private void updateTxns(Connection conn) throws SQLException {
     Statement stmt = conn.createStatement();
     stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index 59c8fe4..5d9e7be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -61,7 +61,9 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
     
     @Override
     public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
       try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
         long startTime = System.currentTimeMillis();
         txnHandler.purgeCompactionHistory();
         int count = isAliveCounter.incrementAndGet(); 
@@ -70,6 +72,11 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
       catch(Throwable t) {
         LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
       }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index de74a7b..f39df17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -61,7 +61,9 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
     }
     @Override
     public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
       try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
         long startTime = System.currentTimeMillis();
         txnHandler.performTimeOuts();
         int count = isAliveCounter.incrementAndGet();
@@ -70,6 +72,11 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
       catch(Throwable t) {
         LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);
       }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 33580fd..1e6e8a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,11 +73,13 @@ public class Cleaner extends CompactorThread {
       // and if so remembers that and then sets it to true at the end.  We have to check here
       // first to make sure we go through a complete iteration of the loop before resetting it.
       boolean setLooped = !looped.get();
-      long startedAt = System.currentTimeMillis();
+      TxnStore.MutexAPI.LockHandle handle = null;
+      long startedAt = -1;
       // Make sure nothing escapes this run method and kills the metastore at large,
       // so wrap it in a big catch Throwable statement.
       try {
-
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+        startedAt = System.currentTimeMillis();
         // First look for all the compactions that are waiting to be cleaned.  If we have not
         // seen an entry before, look for all the locks held on that table or partition and
         // record them.  We will then only clean the partition once all of those locks have been
@@ -86,6 +89,31 @@ public class Cleaner extends CompactorThread {
         // done the compaction will read the more up to date version of the data (either in a
         // newer delta or in a newer base).
         List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+        {
+          /**
+           * Since there may be more than 1 instance of Cleaner running we may have state info
+           * for items which were cleaned by instances.  Here we remove them.
+           *
+           * In the long run if we add end_time to compaction_queue, then we can check that
+           * hive_locks.acquired_at > compaction_queue.end_time + safety_buffer in which case
+           * we know the lock owner is reading files created by this compaction or later.
+           * The advantage is that we don't have to store the locks.
+           */
+          Set<Long> currentToCleanSet = new HashSet<>();
+          for (CompactionInfo ci : toClean) {
+            currentToCleanSet.add(ci.id);
+          }
+          Set<Long> cleanPerformedByOthers = new HashSet<>();
+          for (long id : compactId2CompactInfoMap.keySet()) {
+            if (!currentToCleanSet.contains(id)) {
+              cleanPerformedByOthers.add(id);
+            }
+          }
+          for (long id : cleanPerformedByOthers) {
+            compactId2CompactInfoMap.remove(id);
+            compactId2LockMap.remove(id);
+          }
+        }
         if (toClean.size() > 0 || compactId2LockMap.size() > 0) {
           ShowLocksResponse locksResponse = txnHandler.showLocks(new ShowLocksRequest());
 
@@ -119,6 +147,7 @@ public class Cleaner extends CompactorThread {
                 // Remember to remove this when we're out of the loop,
                 // we can't do it in the loop or we'll get a concurrent modification exception.
                 compactionsCleaned.add(queueEntry.getKey());
+                //Future thought: this may be expensive so consider having a thread pool run in parallel
                 clean(compactId2CompactInfoMap.get(queueEntry.getKey()));
               } else {
                 // Remove the locks we didn't see so we don't look for them again next time
@@ -140,6 +169,11 @@ public class Cleaner extends CompactorThread {
         LOG.error("Caught an exception in the main loop of compactor cleaner, " +
             StringUtils.stringifyException(t));
       }
+      finally {
+        if (handle != null) {
+          handle.releaseLocks();
+        }
+      }
       if (setLooped) {
         looped.set(true);
       }
@@ -206,10 +240,24 @@ public class Cleaner extends CompactorThread {
       StorageDescriptor sd = resolveStorageDescriptor(t, p);
       final String location = sd.getLocation();
 
-      // Create a bogus validTxnList with a high water mark set to MAX_LONG and no open
-      // transactions.  This assures that all deltas are treated as valid and all we return are
-      // obsolete files.
-      final ValidTxnList txnList = new ValidReadTxnList();
+      /**
+       * Each Compaction only compacts as far as the highest txn id such that all txns below it
+       * are resolved (i.e. not opened).  This is what "highestTxnId" tracks.  This is only tracked
+       * since Hive 1.3.0/2.0 - thus may be 0.  See ValidCompactorTxnList and uses for more info.
+       * 
+       * We only want to clean up to the highestTxnId - otherwise we risk deleteing deltas from
+       * under an active reader.
+       * 
+       * Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a 
+       * clean request for D2.  
+       * Cleaner checks existing locks and finds none.
+       * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction
+       * completes which creates D4.
+       * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete
+       * unless ValidTxnList is "capped" at highestTxnId.
+       */
+      final ValidTxnList txnList = ci.highestTxnId > 0 ? 
+        new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList();
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);
@@ -249,7 +297,7 @@ public class Cleaner extends CompactorThread {
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
 
     for (Path dead : filesToDelete) {
-      LOG.debug("Doing to delete path " + dead.toString());
+      LOG.debug("Going to delete path " + dead.toString());
       fs.delete(dead, true);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1898a4d..0e4ba06 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -74,11 +74,15 @@ public class Initiator extends CompactorThread {
       // much easier.  The stop value is only for testing anyway and not used when called from
       // HiveMetaStore.
       do {
-        long startedAt = System.currentTimeMillis();
+        long startedAt = -1;
+        TxnStore.MutexAPI.LockHandle handle = null;
 
         // Wrap the inner parts of the loop in a catch throwable so that any errors in the loop
         // don't doom the entire thread.
-        try {//todo: add method to only get current i.e. skip history - more efficient
+        try {
+          handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+          startedAt = System.currentTimeMillis();
+          //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
           ValidTxnList txns =
               TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
@@ -114,6 +118,8 @@ public class Initiator extends CompactorThread {
               // Check if we already have initiated or are working on a compaction for this partition
               // or table.  If so, skip it.  If we are just waiting on cleaning we can still check,
               // as it may be time to compact again even though we haven't cleaned.
+              //todo: this is not robust.  You can easily run Alter Table to start a compaction between
+              //the time currentCompactions is generated and now
               if (lookForCurrentCompactions(currentCompactions, ci)) {
                 LOG.debug("Found currently initiated or working compaction for " +
                     ci.getFullPartitionName() + " so we will not initiate another compaction");
@@ -134,7 +140,9 @@ public class Initiator extends CompactorThread {
               }
               StorageDescriptor sd = resolveStorageDescriptor(t, p);
               String runAs = findUserToRunAs(sd.getLocation(), t);
-
+              /*Future thought: checkForCompaction will check a lot of file metadata and may be expensive.
+              * Long term we should consider having a thread pool here and running checkForCompactionS
+              * in parallel*/
               CompactionType compactionNeeded = checkForCompaction(ci, txns, sd, runAs);
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
@@ -154,6 +162,11 @@ public class Initiator extends CompactorThread {
           LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
               StringUtils.stringifyException(t));
         }
+        finally {
+          if(handle != null) {
+            handle.releaseLocks();
+          }
+        }
 
         long elapsedTime = System.currentTimeMillis() - startedAt;
         if (elapsedTime >= checkInterval || stop.get())  continue;

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 6f8dc35..8394ec6 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -229,7 +229,6 @@ public class TestTxnCommands2 {
     }
     Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
   }
-  @Ignore("alter table")
   @Test
   public void testAlterTable() throws Exception {
     int[][] tableData = {{1,2}};
@@ -604,7 +603,13 @@ public class TestTxnCommands2 {
   private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
+    int maxIter = 10;
+    int iterCount = 0;
     while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+      if(iterCount++ >= maxIter) {
+        //prevent test hangs
+        throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits");
+      }
       try {
         Thread.sleep(100);//make sure it has run at least once
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/17870823/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 99705b4..b355dbe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -194,7 +194,13 @@ public class TestDbTxnManager {
   private void runReaper() throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
+    int maxIter = 10;
+    int iterCount = 0;
     while(houseKeeperService.getIsAliveCounter() <= lastCount) {
+      if(iterCount++ >= maxIter) {
+        //prevent test hangs
+        throw new IllegalStateException("Reaper didn't run after " + iterCount + " waits");
+      }
       try {
         Thread.sleep(100);//make sure it has run at least once
       }


[2/3] hive git commit: HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)

Posted by ek...@apache.org.
HIVE-13344 - port HIVE-12902 to 1.x line (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c8295051
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c8295051
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c8295051

Branch: refs/heads/branch-1
Commit: c8295051cc26577dcc1eb17709d4ffc0f9784c5b
Parents: db2efe4
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Mar 24 16:21:07 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Mar 24 16:21:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   6 +-
 .../hive/ql/txn/compactor/TestCompactor.java    |  25 +-
 .../hive/metastore/AcidEventListener.java       |  38 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |  14 +-
 .../hive/metastore/HiveMetaStoreClient.java     |   6 +-
 .../metastore/txn/CompactionTxnHandler.java     |  47 +--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 127 +------
 .../hadoop/hive/metastore/txn/TxnStore.java     | 364 +++++++++++++++++++
 .../hadoop/hive/metastore/txn/TxnUtils.java     | 209 +++++++++++
 .../metastore/txn/TestCompactionTxnHandler.java |   5 +-
 .../hive/metastore/txn/TestTxnHandler.java      | 183 +++++-----
 .../metastore/txn/TestTxnHandlerNegative.java   |   2 +-
 .../ql/txn/AcidCompactionHistoryService.java    |  16 +-
 .../hive/ql/txn/AcidHouseKeeperService.java     |  13 +-
 .../hive/ql/txn/compactor/CompactorThread.java  |   7 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |   8 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   6 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |  22 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |  12 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |  34 +-
 .../hive/ql/txn/compactor/TestCleaner.java      |  20 +-
 .../hive/ql/txn/compactor/TestInitiator.java    |   7 +-
 .../hive/ql/txn/compactor/TestWorker.java       |  13 +-
 23 files changed, 847 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b78bea2..f84c940 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -39,6 +39,7 @@ import java.util.regex.Pattern;
 
 import javax.security.auth.login.LoginException;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,7 +57,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.HiveCompat;
 
-import com.google.common.base.Joiner;
 
 /**
  * Hive Configuration.
@@ -607,6 +607,10 @@ public class HiveConf extends Configuration {
     METASTORE_RAW_STORE_IMPL("hive.metastore.rawstore.impl", "org.apache.hadoop.hive.metastore.ObjectStore",
         "Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. \n" +
         "This class is used to store and retrieval of raw metadata objects such as table, database"),
+    METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl",
+        "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler",
+        "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore.  This " +
+        "class is used to store and retrieve transactions and locks"),
     METASTORE_CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
         "Driver class name for a JDBC metastore"),
     METASTORE_MANAGER_FACTORY_CLASS("javax.jdo.PersistenceManagerFactoryClass",

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 9c0f374..37bbab8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -8,7 +8,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -20,10 +19,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -187,7 +186,7 @@ public class TestCompactor {
     initiator.init(stop, new AtomicBoolean());
     initiator.run();
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(4, compacts.size());
@@ -290,7 +289,7 @@ public class TestCompactor {
     initiator.init(stop, new AtomicBoolean());
     initiator.run();
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(4, compacts.size());
@@ -363,7 +362,7 @@ public class TestCompactor {
     execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " +
       tblName + " after load:");
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     CompactionInfo ci = new CompactionInfo("default", tblName, "bkt=0", CompactionType.MAJOR);
     LOG.debug("List of stats columns before analyze Part1: " + txnHandler.findColumnsWithStats(ci));
     Worker.StatsUpdater su = Worker.StatsUpdater.init(ci, colNames, conf,
@@ -498,7 +497,7 @@ public class TestCompactor {
     initiator.init(stop, new AtomicBoolean());
     initiator.run();
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(2, compacts.size());
@@ -538,7 +537,7 @@ public class TestCompactor {
     initiator.init(stop, new AtomicBoolean());
     initiator.run();
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(2, compacts.size());
@@ -580,7 +579,7 @@ public class TestCompactor {
     initiator.init(stop, new AtomicBoolean());
     initiator.run();
 
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
@@ -620,7 +619,7 @@ public class TestCompactor {
       writeBatch(connection, writer, true);
 
       // Now, compact
-      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
       txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
       Worker t = new Worker();
       t.setThreadId((int) t.getId());
@@ -682,7 +681,7 @@ public class TestCompactor {
       writeBatch(connection, writer, true);
 
       // Now, compact
-      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
       txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
       Worker t = new Worker();
       t.setThreadId((int) t.getId());
@@ -738,7 +737,7 @@ public class TestCompactor {
       txnBatch.abort();
 
       // Now, compact
-      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
       txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
       Worker t = new Worker();
       t.setThreadId((int) t.getId());
@@ -804,7 +803,7 @@ public class TestCompactor {
 
 
       // Now, compact
-      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
       txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MAJOR));
       Worker t = new Worker();
       t.setThreadId((int) t.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 767bc54..b241e9e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 
 
 /**
@@ -33,7 +34,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnHandler;
  */
 public class AcidEventListener extends MetaStoreEventListener {
 
-  private TxnHandler txnHandler;
+  private TxnStore txnHandler;
   private HiveConf hiveConf;
 
   public AcidEventListener(Configuration configuration) {
@@ -46,24 +47,47 @@ public class AcidEventListener extends MetaStoreEventListener {
     // We can loop thru all the tables to check if they are ACID first and then perform cleanup,
     // but it's more efficient to unconditionally perform cleanup for the database, especially
     // when there are a lot of tables
-    txnHandler = new TxnHandler(hiveConf);
+    txnHandler = getTxnHandler();
     txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
   }
 
   @Override
   public void onDropTable(DropTableEvent tableEvent)  throws MetaException {
-    if (TxnHandler.isAcidTable(tableEvent.getTable())) {
-      txnHandler = new TxnHandler(hiveConf);
+    if (TxnUtils.isAcidTable(tableEvent.getTable())) {
+      txnHandler = getTxnHandler();
       txnHandler.cleanupRecords(HiveObjectType.TABLE, null, tableEvent.getTable(), null);
     }
   }
 
   @Override
   public void onDropPartition(DropPartitionEvent partitionEvent)  throws MetaException {
-    if (TxnHandler.isAcidTable(partitionEvent.getTable())) {
-      txnHandler = new TxnHandler(hiveConf);
+    if (TxnUtils.isAcidTable(partitionEvent.getTable())) {
+      txnHandler = getTxnHandler();
       txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
           partitionEvent.getPartitionIterator());
     }
   }
+  private TxnStore getTxnHandler() {
+    boolean hackOn = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+      HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+    String origTxnMgr = null;
+    boolean origConcurrency = false;
+
+    // Since TxnUtils.getTxnStore calls TxnHandler.setConf -> checkQFileTestHack -> TxnDbUtil.setConfValues,
+    // which may change the values of below two entries, we need to avoid pulluting the original values
+    if (hackOn) {
+      origTxnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
+      origConcurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+    }
+
+    txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+    // Set them back
+    if (hackOn) {
+      hiveConf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, origTxnMgr);
+      hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, origConcurrency);
+    }
+
+    return txnHandler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index fba545d..bf65532 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
-
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -176,7 +175,8 @@ import org.apache.hadoop.hive.metastore.model.MRoleMap;
 import org.apache.hadoop.hive.metastore.model.MTableColumnPrivilege;
 import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.shims.HadoopShims;
@@ -308,9 +308,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         };
 
-    private final ThreadLocal<TxnHandler> threadLocalTxn = new ThreadLocal<TxnHandler>() {
+    private static final ThreadLocal<TxnStore> threadLocalTxn = new ThreadLocal<TxnStore>() {
       @Override
-      protected synchronized TxnHandler initialValue() {
+      protected TxnStore initialValue() {
         return null;
       }
     };
@@ -584,10 +584,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       return ms;
     }
 
-    private TxnHandler getTxnHandler() {
-      TxnHandler txn = threadLocalTxn.get();
+    private TxnStore getTxnHandler() {
+      TxnStore txn = threadLocalTxn.get();
       if (txn == null) {
-        txn = new TxnHandler(hiveConf);
+        txn = TxnUtils.getTxnStore(hiveConf);
         threadLocalTxn.set(txn);
       }
       return txn;

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 393ef3b..50bf43c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -134,7 +134,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge;
@@ -1846,12 +1846,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
 
   @Override
   public ValidTxnList getValidTxns() throws TException {
-    return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
+    return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
   }
 
   @Override
   public ValidTxnList getValidTxns(long currentTxn) throws TException {
-    return TxnHandler.createValidReadTxnList(client.get_open_txns(), currentTxn);
+    return TxnUtils.createValidReadTxnList(client.get_open_txns(), currentTxn);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 28e06ed..f7c738a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,27 +20,33 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.util.StringUtils;
 
-import java.sql.*;
-import java.util.*;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Extends the transaction handler with methods needed only by the compactor threads.  These
  * methods are not available through the thrift interface.
  */
-public class CompactionTxnHandler extends TxnHandler {
+class CompactionTxnHandler extends TxnHandler {
   static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
   // See TxnHandler for notes on how to deal with deadlocks.  Follow those notes.
 
-  public CompactionTxnHandler(HiveConf conf) {
-    super(conf);
+  public CompactionTxnHandler() {
   }
 
   /**
@@ -385,7 +391,7 @@ public class CompactionTxnHandler extends TxnHandler {
           }
 
           // Populate the complete query with provided prefix and suffix
-          TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
 
           for (String query : queries) {
             LOG.debug("Going to execute update <" + query + ">");
@@ -450,7 +456,7 @@ public class CompactionTxnHandler extends TxnHandler {
         prefix.append("delete from TXNS where ");
         suffix.append("");
 
-        TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
 
         for (String query : queries) {
           LOG.debug("Going to execute update <" + query + ">");
@@ -620,27 +626,6 @@ public class CompactionTxnHandler extends TxnHandler {
   }
 
   /**
-   * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
-   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
-   * compact the files, and thus treats only open transactions as invalid.  Additionally any
-   * txnId > highestOpenTxnId is also invalid.  This is avoid creating something like
-   * delta_17_120 where txnId 80, for example, is still open.
-   * @param txns txn list from the metastore
-   * @return a valid txn list.
-   */
-  public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
-    long highWater = txns.getTxn_high_water_mark();
-    long minOpenTxn = Long.MAX_VALUE;
-    long[] exceptions = new long[txns.getOpen_txnsSize()];
-    int i = 0;
-    for (TxnInfo txn : txns.getOpen_txns()) {
-      if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
-      exceptions[i++] = txn.getId();
-    }
-    highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
-    return new ValidCompactorTxnList(exceptions, -1, highWater);
-  }
-  /**
    * Record the highest txn id that the {@code ci} compaction job will pay attention to.
    */
   public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
@@ -746,7 +731,7 @@ public class CompactionTxnHandler extends TxnHandler {
         prefix.append("delete from COMPLETED_COMPACTIONS where ");
         suffix.append("");
 
-        TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
 
         for (String query : queries) {
           LOG.debug("Going to execute update <" + query + ">");

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0ddc078..9789371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -87,14 +87,7 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class TxnHandler {
-  // Compactor states (Should really be enum)
-  static final public String INITIATED_RESPONSE = "initiated";
-  static final public String WORKING_RESPONSE = "working";
-  static final public String CLEANING_RESPONSE = "ready for cleaning";
-  static final public String FAILED_RESPONSE = "failed";
-  static final public String SUCCEEDED_RESPONSE = "succeeded";
-  static final public String ATTEMPTED_RESPONSE = "attempted";
+abstract class TxnHandler implements TxnStore {
 
   static final protected char INITIATED_STATE = 'i';
   static final protected char WORKING_STATE = 'w';
@@ -131,7 +124,7 @@ public class TxnHandler {
    * Number of consecutive deadlocks we have seen
    */
   private int deadlockCnt;
-  private final long deadlockRetryInterval;
+  private long deadlockRetryInterval;
   protected HiveConf conf;
   protected DatabaseProduct dbProduct;
 
@@ -139,8 +132,8 @@ public class TxnHandler {
   private long timeout;
 
   private String identifierQuoteString; // quotes to use for quoting tables, where necessary
-  private final long retryInterval;
-  private final int retryLimit;
+  private long retryInterval;
+  private int retryLimit;
   private int retryNum;
   /**
    * Derby specific concurrency control
@@ -157,7 +150,10 @@ public class TxnHandler {
   // in mind.  To do this they should call checkRetryable() AFTER rolling back the db transaction,
   // and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
 
-  public TxnHandler(HiveConf conf) {
+  public TxnHandler() {
+  }
+
+  public void setConf(HiveConf conf) {
     this.conf = conf;
 
     checkQFileTestHack();
@@ -183,7 +179,6 @@ public class TxnHandler {
         TimeUnit.MILLISECONDS);
     retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
     deadlockRetryInterval = retryInterval / 10;
-
   }
 
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -1211,6 +1206,7 @@ public class TxnHandler {
    * Clean up corresponding records in metastore tables, specifically:
    * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
    */
+  @Override
   public void cleanupRecords(HiveObjectType type, Database db, Table table,
                              Iterator<Partition> partitionIterator) throws MetaException {
     try {
@@ -1386,106 +1382,11 @@ public class TxnHandler {
     String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
     return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
   }
-
-  /**
-   * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
-   * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
-   * For NOT IN case, NOT IN list is broken into multiple AND clauses.
-   * @param queries array of complete query strings
-   * @param prefix part of the query that comes before IN list
-   * @param suffix part of the query that comes after IN list
-   * @param inList the list containing IN list values
-   * @param inColumn column name of IN list operator
-   * @param addParens add a pair of parenthesis outside the IN lists
-   *                  e.g. ( id in (1,2,3) OR id in (4,5,6) )
-   * @param notIn clause to be broken up is NOT IN
-   */
-  public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
-                                            StringBuilder suffix, List<Long> inList,
-                                            String inColumn, boolean addParens, boolean notIn) {
-    int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
-    int numWholeBatches = inList.size() / batchSize;
-    StringBuilder buf = new StringBuilder();
-    buf.append(prefix);
-    if (addParens) {
-      buf.append("(");
-    }
-    buf.append(inColumn);
-    if (notIn) {
-      buf.append(" not in (");
-    } else {
-      buf.append(" in (");
-    }
-
-    for (int i = 0; i <= numWholeBatches; i++) {
-      if (needNewQuery(conf, buf)) {
-        // Wrap up current query string
-        if (addParens) {
-          buf.append(")");
-        }
-        buf.append(suffix);
-        queries.add(buf.toString());
-
-        // Prepare a new query string
-        buf.setLength(0);
-      }
-
-      if (i > 0) {
-        if (notIn) {
-          if (buf.length() == 0) {
-            buf.append(prefix);
-            if (addParens) {
-              buf.append("(");
-            }
-          } else {
-            buf.append(" and ");
-          }
-          buf.append(inColumn);
-          buf.append(" not in (");
-        } else {
-          if (buf.length() == 0) {
-            buf.append(prefix);
-            if (addParens) {
-              buf.append("(");
-            }
-          } else {
-            buf.append(" or ");
-          }
-          buf.append(inColumn);
-          buf.append(" in (");
-        }
-      }
-
-      if (i * batchSize == inList.size()) {
-        // At this point we just realized we don't need another query
-        return;
-      }
-      for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
-        buf.append(inList.get(j)).append(",");
-      }
-      buf.setCharAt(buf.length() - 1, ')');
-    }
-
-    if (addParens) {
-      buf.append(")");
-    }
-    buf.append(suffix);
-    queries.add(buf.toString());
-  }
-
-  /** Estimate if the size of a string will exceed certain limit */
-  private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
-    int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
-    // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
-    long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
-    return sizeInBytes / 1024 > queryMemoryLimit;
-  }
-
   /**
    * For testing only, do not use.
    */
   @VisibleForTesting
-  int numLocksInLockTable() throws SQLException, MetaException {
+  public int numLocksInLockTable() throws SQLException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet rs = null;
@@ -1508,7 +1409,7 @@ public class TxnHandler {
   /**
    * For testing only, do not use.
    */
-  long setTimeout(long milliseconds) {
+  public long setTimeout(long milliseconds) {
     long previous_timeout = timeout;
     timeout = milliseconds;
     return previous_timeout;
@@ -1975,7 +1876,7 @@ public class TxnHandler {
         suffix.append("");
       }
 
-      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
+      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", true, false);
 
       for (String query : queries) {
         LOG.debug("Going to execute update <" + query + ">");
@@ -1998,7 +1899,7 @@ public class TxnHandler {
       prefix.append("delete from HIVE_LOCKS where ");
       suffix.append("");
 
-      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
+      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "hl_txnid", false, false);
 
       for (String query : queries) {
         LOG.debug("Going to execute update <" + query + ">");
@@ -2435,7 +2336,7 @@ public class TxnHandler {
       prefix.append(" and hl_txnid = 0 and ");
       suffix.append("");
 
-      TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
+      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "hl_lock_ext_id", true, false);
 
       int deletedLocks = 0;
       for (String query : queries) {

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
new file mode 100644
index 0000000..6fc6ed9
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ *
+ * Note on log messages:  Please include txnid:X and lockid info using
+ * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
+ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
+ * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
+ * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface TxnStore {
+
+  // Compactor states (Should really be enum)
+  static final public String INITIATED_RESPONSE = "initiated";
+  static final public String WORKING_RESPONSE = "working";
+  static final public String CLEANING_RESPONSE = "ready for cleaning";
+  static final public String FAILED_RESPONSE = "failed";
+  static final public String SUCCEEDED_RESPONSE = "succeeded";
+  static final public String ATTEMPTED_RESPONSE = "attempted";
+
+  public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000;
+
+  public void setConf(HiveConf conf);
+
+  /**
+   * Get information about open transactions.  This gives extensive information about the
+   * transactions rather than just the list of transactions.  This should be used when the need
+   * is to see information about the transactions (e.g. show transactions).
+   * @return information about open transactions
+   * @throws MetaException
+   */
+  public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException;
+
+  /**
+   * Get list of valid transactions.  This gives just the list of transactions that are open.
+   * @return list of open transactions, as well as a high water mark.
+   * @throws MetaException
+   */
+  public GetOpenTxnsResponse getOpenTxns() throws MetaException;
+
+  /**
+   * Open a set of transactions
+   * @param rqst request to open transactions
+   * @return information on opened transactions
+   * @throws MetaException
+   */
+  public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+
+  /**
+   * Abort (rollback) a transaction.
+   * @param rqst info on transaction to abort
+   * @throws NoSuchTxnException
+   * @throws MetaException
+   */
+  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException;
+
+  /**
+   * Commit a transaction
+   * @param rqst info on transaction to commit
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  public void commitTxn(CommitTxnRequest rqst)
+    throws NoSuchTxnException, TxnAbortedException,  MetaException;
+
+  /**
+   * Obtain a lock.
+   * @param rqst information on the lock to obtain.  If the requester is part of a transaction
+   *             the txn information must be included in the lock request.
+   * @return info on the lock, including whether it was obtained.
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  public LockResponse lock(LockRequest rqst)
+    throws NoSuchTxnException, TxnAbortedException, MetaException;
+
+  /**
+   * Check whether a lock has been obtained.  This is used after {@link #lock} returned a wait
+   * state.
+   * @param rqst info on the lock to check
+   * @return info on the state of the lock
+   * @throws NoSuchTxnException
+   * @throws NoSuchLockException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  public LockResponse checkLock(CheckLockRequest rqst)
+    throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException;
+
+  /**
+   * Unlock a lock.  It is not legal to call this if the caller is part of a txn.  In that case
+   * the txn should be committed or aborted instead.  (Note someday this will change since
+   * multi-statement transactions will allow unlocking in the transaction.)
+   * @param rqst lock to unlock
+   * @throws NoSuchLockException
+   * @throws TxnOpenException
+   * @throws MetaException
+   */
+  public void unlock(UnlockRequest rqst)
+    throws NoSuchLockException, TxnOpenException, MetaException;
+
+  /**
+   * Get information on current locks.
+   * @param rqst lock information to retrieve
+   * @return lock information.
+   * @throws MetaException
+   */
+  public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException;
+
+  /**
+   * Send a heartbeat for a lock or a transaction
+   * @param ids lock and/or txn id to heartbeat
+   * @throws NoSuchTxnException
+   * @throws NoSuchLockException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  public void heartbeat(HeartbeatRequest ids)
+    throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException;
+
+  /**
+   * Heartbeat a group of transactions together
+   * @param rqst set of transactions to heartbat
+   * @return info on txns that were heartbeated
+   * @throws MetaException
+   */
+  public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+    throws MetaException;
+
+  /**
+   * Submit a compaction request into the queue.  This is called when a user manually requests a
+   * compaction.
+   * @param rqst information on what to compact
+   * @return id of the compaction that has been started
+   * @throws MetaException
+   */
+  public long compact(CompactionRequest rqst) throws MetaException;
+
+  /**
+   * Show list of current compactions
+   * @param rqst info on which compactions to show
+   * @return compaction information
+   * @throws MetaException
+   */
+  public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException;
+
+  /**
+   * Add information on a set of dynamic partitions that participated in a transaction.
+   * @param rqst dynamic partition info.
+   * @throws NoSuchTxnException
+   * @throws TxnAbortedException
+   * @throws MetaException
+   */
+  public void addDynamicPartitions(AddDynamicPartitions rqst)
+      throws NoSuchTxnException,  TxnAbortedException, MetaException;
+
+  /**
+   * Clean up corresponding records in metastore tables
+   * @param type Hive object type
+   * @param db database object
+   * @param table table object
+   * @param partitionIterator partition iterator
+   * @throws MetaException
+   */
+  public void cleanupRecords(HiveObjectType type, Database db, Table table,
+                             Iterator<Partition> partitionIterator) throws MetaException;
+  /**
+   * Timeout transactions and/or locks.  This should only be called by the compactor.
+   */
+  public void performTimeOuts();
+
+  /**
+   * This will look through the completed_txn_components table and look for partitions or tables
+   * that may be ready for compaction.  Also, look through txns and txn_components tables for
+   * aborted transactions that we should add to the list.
+   * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+   *                   potential compaction.
+   * @return list of CompactionInfo structs.  These will not have id, type,
+   * or runAs set since these are only potential compactions not actual ones.
+   */
+  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+
+  /**
+   * Sets the user to run as.  This is for the case
+   * where the request was generated by the user and so the worker must set this value later.
+   * @param cq_id id of this entry in the queue
+   * @param user user to run the jobs as
+   */
+  public void setRunAs(long cq_id, String user) throws MetaException;
+
+  /**
+   * This will grab the next compaction request off of
+   * the queue, and assign it to the worker.
+   * @param workerId id of the worker calling this, will be recorded in the db
+   * @return an info element for this compaction request, or null if there is no work to do now.
+   */
+  public CompactionInfo findNextToCompact(String workerId) throws MetaException;
+
+  /**
+   * This will mark an entry in the queue as compacted
+   * and put it in the ready to clean state.
+   * @param info info on the compaction entry to mark as compacted.
+   */
+  public void markCompacted(CompactionInfo info) throws MetaException;
+
+  /**
+   * Find entries in the queue that are ready to
+   * be cleaned.
+   * @return information on the entry in the queue.
+   */
+  public List<CompactionInfo> findReadyToClean() throws MetaException;
+
+  /**
+   * This will remove an entry from the queue after
+   * it has been compacted.
+   * 
+   * @param info info on the compaction entry to remove
+   */
+  public void markCleaned(CompactionInfo info) throws MetaException;
+
+  /**
+   * Mark a compaction entry as failed.  This will move it to the compaction history queue with a
+   * failed status.  It will NOT clean up aborted transactions in the table/partition associated
+   * with this compaction.
+   * @param info information on the compaction that failed.
+   * @throws MetaException
+   */
+  public void markFailed(CompactionInfo info) throws MetaException;
+
+  /**
+   * Clean up aborted transactions from txns that have no components in txn_components.  The reson such
+   * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+   */
+  public void cleanEmptyAbortedTxns() throws MetaException;
+
+  /**
+   * This will take all entries assigned to workers
+   * on a host return them to INITIATED state.  The initiator should use this at start up to
+   * clean entries from any workers that were in the middle of compacting when the metastore
+   * shutdown.  It does not reset entries from worker threads on other hosts as those may still
+   * be working.
+   * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
+   *                 so that like hostname% will match the worker id.
+   */
+  public void revokeFromLocalWorkers(String hostname) throws MetaException;
+
+  /**
+   * This call will return all compaction queue
+   * entries assigned to a worker but over the timeout back to the initiated state.
+   * This should be called by the initiator on start up and occasionally when running to clean up
+   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
+   * first.
+   * @param timeout number of milliseconds since start time that should elapse before a worker is
+   *                declared dead.
+   */
+  public void revokeTimedoutWorkers(long timeout) throws MetaException;
+
+  /**
+   * Queries metastore DB directly to find columns in the table which have statistics information.
+   * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+   * table level stats are examined.
+   * @throws MetaException
+   */
+  public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
+
+  /**
+   * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+   */
+  public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+
+  /**
+   * For any given compactable entity (partition, table if not partitioned) the history of compactions
+   * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
+   * history such that a configurable number of each type of state is present.  Any other entries
+   * can be purged.  This scheme has advantage of always retaining the last failure/success even if
+   * it's not recent.
+   * @throws MetaException
+   */
+  public void purgeCompactionHistory() throws MetaException;
+
+  /**
+   * Determine if there are enough consecutive failures compacting a table or partition that no
+   * new automatic compactions should be scheduled.  User initiated compactions do not do this
+   * check.
+   * @param ci  Table or partition to check.
+   * @return true if it is ok to compact, false if there have been too many failures.
+   * @throws MetaException
+   */
+  public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException;
+
+
+  @VisibleForTesting
+  public int numLocksInLockTable() throws SQLException, MetaException;
+
+  @VisibleForTesting
+  long setTimeout(long milliseconds);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
new file mode 100644
index 0000000..f60e34b
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TxnUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
+
+  /**
+   * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
+   * read the files, and thus treats both open and aborted transactions as invalid.
+   * @param txns txn list from the metastore
+   * @param currentTxn Current transaction that the user has open.  If this is greater than 0 it
+   *                   will be removed from the exceptions list so that the user sees his own
+   *                   transaction as valid.
+   * @return a valid txn list.
+   */
+  public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+    long highWater = txns.getTxn_high_water_mark();
+    Set<Long> open = txns.getOpen_txns();
+    long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+    int i = 0;
+    for(long txn: open) {
+      if (currentTxn > 0 && currentTxn == txn) continue;
+      exceptions[i++] = txn;
+    }
+    return new ValidReadTxnList(exceptions, highWater);
+  }
+
+  /**
+   * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that the caller intends to
+   * compact the files, and thus treats only open transactions as invalid.  Additionally any
+   * txnId > highestOpenTxnId is also invalid.  This is avoid creating something like
+   * delta_17_120 where txnId 80, for example, is still open.
+   * @param txns txn list from the metastore
+   * @return a valid txn list.
+   */
+  public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+    long highWater = txns.getTxn_high_water_mark();
+    long minOpenTxn = Long.MAX_VALUE;
+    long[] exceptions = new long[txns.getOpen_txnsSize()];
+    int i = 0;
+    for (TxnInfo txn : txns.getOpen_txns()) {
+      if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId());
+      exceptions[i++] = txn.getId();
+    }
+    highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+    return new ValidCompactorTxnList(exceptions, -1, highWater);
+  }
+
+  /**
+   * Get an instance of the TxnStore that is appropriate for this store
+   * @param conf configuration
+   * @return txn store
+   */
+  public static TxnStore getTxnStore(HiveConf conf) {
+    String className = conf.getVar(HiveConf.ConfVars.METASTORE_TXN_STORE_IMPL);
+    try {
+      TxnStore handler = ((Class<? extends TxnHandler>) MetaStoreUtils.getClass(
+        className)).newInstance();
+      handler.setConf(conf);
+      return handler;
+    } catch (Exception e) {
+      LOG.error("Unable to instantiate raw store directly in fastpath mode", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Checks if a table is a valid ACID table.
+   * Note, users are responsible for using the correct TxnManager. We do not look at
+   * SessionState.get().getTxnMgr().supportsAcid() here
+   * @param table table
+   * @return true if table is a legit ACID table, false otherwise
+   */
+  public static boolean isAcidTable(Table table) {
+    if (table == null) {
+      return false;
+    }
+    Map<String, String> parameters = table.getParameters();
+    String tableIsTransactional = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
+  }
+  /**
+   * Build a query (or queries if one query is too big) with specified "prefix" and "suffix",
+   * while populating the IN list into multiple OR clauses, e.g. id in (1,2,3) OR id in (4,5,6)
+   * For NOT IN case, NOT IN list is broken into multiple AND clauses.
+   * @param queries array of complete query strings
+   * @param prefix part of the query that comes before IN list
+   * @param suffix part of the query that comes after IN list
+   * @param inList the list containing IN list values
+   * @param inColumn column name of IN list operator
+   * @param addParens add a pair of parenthesis outside the IN lists
+   *                  e.g. ( id in (1,2,3) OR id in (4,5,6) )
+   * @param notIn clause to be broken up is NOT IN
+   */
+  public static void buildQueryWithINClause(HiveConf conf, List<String> queries, StringBuilder prefix,
+                                            StringBuilder suffix, List<Long> inList,
+                                            String inColumn, boolean addParens, boolean notIn) {
+    int batchSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
+    int numWholeBatches = inList.size() / batchSize;
+    StringBuilder buf = new StringBuilder();
+    buf.append(prefix);
+    if (addParens) {
+      buf.append("(");
+    }
+    buf.append(inColumn);
+    if (notIn) {
+      buf.append(" not in (");
+    } else {
+      buf.append(" in (");
+    }
+
+    for (int i = 0; i <= numWholeBatches; i++) {
+      if (needNewQuery(conf, buf)) {
+        // Wrap up current query string
+        if (addParens) {
+          buf.append(")");
+        }
+        buf.append(suffix);
+        queries.add(buf.toString());
+
+        // Prepare a new query string
+        buf.setLength(0);
+      }
+
+      if (i > 0) {
+        if (notIn) {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" and ");
+          }
+          buf.append(inColumn);
+          buf.append(" not in (");
+        } else {
+          if (buf.length() == 0) {
+            buf.append(prefix);
+            if (addParens) {
+              buf.append("(");
+            }
+          } else {
+            buf.append(" or ");
+          }
+          buf.append(inColumn);
+          buf.append(" in (");
+        }
+      }
+
+      if (i * batchSize == inList.size()) {
+        // At this point we just realized we don't need another query
+        return;
+      }
+      for (int j = i * batchSize; j < (i + 1) * batchSize && j < inList.size(); j++) {
+        buf.append(inList.get(j)).append(",");
+      }
+      buf.setCharAt(buf.length() - 1, ')');
+    }
+
+    if (addParens) {
+      buf.append(")");
+    }
+    buf.append(suffix);
+    queries.add(buf.toString());
+  }
+
+  /** Estimate if the size of a string will exceed certain limit */
+  private static boolean needNewQuery(HiveConf conf, StringBuilder sb) {
+    int queryMemoryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH);
+    // http://www.javamex.com/tutorials/memory/string_memory_usage.shtml
+    long sizeInBytes = 8 * (((sb.length() * 2) + 45) / 8);
+    return sizeInBytes / 1024 > queryMemoryLimit;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 051da60..bdeacb9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -42,8 +42,7 @@ import static junit.framework.Assert.*;
 public class TestCompactionTxnHandler {
 
   private HiveConf conf = new HiveConf();
-  private CompactionTxnHandler txnHandler;
-  static final private Log LOG = LogFactory.getLog(TestCompactionTxnHandler.class);
+  private TxnStore txnHandler;
 
   public TestCompactionTxnHandler() throws Exception {
     TxnDbUtil.setConfValues(conf);
@@ -424,7 +423,7 @@ public class TestCompactionTxnHandler {
   @Before
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();
-    txnHandler = new CompactionTxnHandler(conf);
+    txnHandler = TxnUtils.getTxnStore(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 930af7c..b8cab71 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -34,7 +34,11 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
 
 /**
  * Tests for TxnHandler.
@@ -44,7 +48,7 @@ public class TestTxnHandler {
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   private HiveConf conf = new HiveConf();
-  private TxnHandler txnHandler;
+  private TxnStore txnHandler;
 
   public TestTxnHandler() throws Exception {
     TxnDbUtil.setConfValues(conf);
@@ -1111,99 +1115,102 @@ public class TestTxnHandler {
   @Ignore
   public void deadlockDetected() throws Exception {
     LOG.debug("Starting deadlock test");
-    Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-    Statement stmt = conn.createStatement();
-    long now = txnHandler.getDbTime(conn);
-    stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
-        "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
-        "'scooby.com')");
-    stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
-        "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
-        "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
-        txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
-        "'scooby.com')");
-    conn.commit();
-    txnHandler.closeDbConn(conn);
-
-    final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
-    final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-    final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
-    try {
+    if (txnHandler instanceof TxnHandler) {
+      final TxnHandler tHndlr = (TxnHandler)txnHandler;
+      Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      Statement stmt = conn.createStatement();
+      long now = tHndlr.getDbTime(conn);
+      stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+          "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+          "'scooby.com')");
+      stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+          "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+          "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+          tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+          "'scooby.com')");
+      conn.commit();
+      tHndlr.closeDbConn(conn);
+
+      final AtomicBoolean sawDeadlock = new AtomicBoolean();
+
+      final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+      try {
 
-      for (int i = 0; i < 5; i++) {
-        Thread t1 = new Thread() {
-          @Override
-          public void run() {
-            try {
+        for (int i = 0; i < 5; i++) {
+          Thread t1 = new Thread() {
+            @Override
+            public void run() {
               try {
-                updateTxns(conn1);
-                updateLocks(conn1);
-                Thread.sleep(1000);
-                conn1.commit();
-                LOG.debug("no exception, no deadlock");
-              } catch (SQLException e) {
                 try {
-                  txnHandler.checkRetryable(conn1, e, "thread t1");
-                  LOG.debug("Got an exception, but not a deadlock, SQLState is " +
-                      e.getSQLState() + " class of exception is " + e.getClass().getName() +
-                      " msg is <" + e.getMessage() + ">");
-                } catch (TxnHandler.RetryException de) {
-                  LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
-                      "exception is " + e.getClass().getName() + " msg is <" + e
-                      .getMessage() + ">");
-                  sawDeadlock.set(true);
+                  updateTxns(conn1);
+                  updateLocks(conn1);
+                  Thread.sleep(1000);
+                  conn1.commit();
+                  LOG.debug("no exception, no deadlock");
+                } catch (SQLException e) {
+                  try {
+                    tHndlr.checkRetryable(conn1, e, "thread t1");
+                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                        " msg is <" + e.getMessage() + ">");
+                  } catch (TxnHandler.RetryException de) {
+                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                        "exception is " + e.getClass().getName() + " msg is <" + e
+                        .getMessage() + ">");
+                    sawDeadlock.set(true);
+                  }
                 }
+                conn1.rollback();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
               }
-              conn1.rollback();
-            } catch (Exception e) {
-              throw new RuntimeException(e);
             }
-          }
-        };
+          };
 
-        Thread t2 = new Thread() {
-          @Override
-          public void run() {
-            try {
+          Thread t2 = new Thread() {
+            @Override
+            public void run() {
               try {
-                updateLocks(conn2);
-                updateTxns(conn2);
-                Thread.sleep(1000);
-                conn2.commit();
-                LOG.debug("no exception, no deadlock");
-              } catch (SQLException e) {
                 try {
-                  txnHandler.checkRetryable(conn2, e, "thread t2");
-                  LOG.debug("Got an exception, but not a deadlock, SQLState is " +
-                      e.getSQLState() + " class of exception is " + e.getClass().getName() +
-                      " msg is <" + e.getMessage() + ">");
-                } catch (TxnHandler.RetryException de) {
-                  LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
-                      "exception is " + e.getClass().getName() + " msg is <" + e
-                      .getMessage() + ">");
-                  sawDeadlock.set(true);
+                  updateLocks(conn2);
+                  updateTxns(conn2);
+                  Thread.sleep(1000);
+                  conn2.commit();
+                  LOG.debug("no exception, no deadlock");
+                } catch (SQLException e) {
+                  try {
+                    tHndlr.checkRetryable(conn2, e, "thread t2");
+                    LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                        e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                        " msg is <" + e.getMessage() + ">");
+                  } catch (TxnHandler.RetryException de) {
+                    LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                        "exception is " + e.getClass().getName() + " msg is <" + e
+                        .getMessage() + ">");
+                    sawDeadlock.set(true);
+                  }
                 }
+                conn2.rollback();
+              } catch (Exception e) {
+                throw new RuntimeException(e);
               }
-              conn2.rollback();
-            } catch (Exception e) {
-              throw new RuntimeException(e);
             }
-          }
-        };
-
-        t1.start();
-        t2.start();
-        t1.join();
-        t2.join();
-        if (sawDeadlock.get()) break;
+          };
+
+          t1.start();
+          t2.start();
+          t1.join();
+          t2.join();
+          if (sawDeadlock.get()) break;
+        }
+        assertTrue(sawDeadlock.get());
+      } finally {
+        conn1.rollback();
+        tHndlr.closeDbConn(conn1);
+        conn2.rollback();
+        tHndlr.closeDbConn(conn2);
       }
-      assertTrue(sawDeadlock.get());
-    } finally {
-      conn1.rollback();
-      txnHandler.closeDbConn(conn1);
-      conn2.rollback();
-      txnHandler.closeDbConn(conn2);
     }
   }
 
@@ -1236,7 +1243,7 @@ public class TestTxnHandler {
     for (long i = 1; i <= 200; i++) {
       inList.add(i);
     }
-    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(1, queries.size());
     runAgainstDerby(queries);
 
@@ -1244,7 +1251,7 @@ public class TestTxnHandler {
     //          The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
     queries.clear();
     inList.add((long)201);
-    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(2, queries.size());
     runAgainstDerby(queries);
 
@@ -1255,13 +1262,13 @@ public class TestTxnHandler {
     for (long i = 202; i <= 4321; i++) {
       inList.add(i);
     }
-    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(3, queries.size());
     runAgainstDerby(queries);
 
     // Case 4 - NOT IN list
     queries.clear();
-    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
     Assert.assertEquals(3, queries.size());
     runAgainstDerby(queries);
 
@@ -1269,7 +1276,7 @@ public class TestTxnHandler {
     queries.clear();
     suffix.setLength(0);
     suffix.append("");
-    TxnHandler.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
     Assert.assertEquals(3, queries.size());
     runAgainstDerby(queries);
   }
@@ -1297,7 +1304,7 @@ public class TestTxnHandler {
   @Before
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();
-    txnHandler = new TxnHandler(conf);
+    txnHandler = TxnUtils.getTxnStore(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
index abe1e37..6c27515 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandlerNegative.java
@@ -37,7 +37,7 @@ public class TestTxnHandlerNegative {
     conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "blah");
     RuntimeException e = null;
     try {
-      TxnHandler txnHandler1 = new TxnHandler(conf);
+      TxnUtils.getTxnStore(conf);
     }
     catch(RuntimeException ex) {
       LOG.info("Expected error: " + ex.getMessage(), ex);

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
index a91ca5c..59c8fe4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidCompactionHistoryService.java
@@ -18,20 +18,12 @@
 package org.apache.hadoop.hive.ql.txn;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
-import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -60,10 +52,10 @@ public class AcidCompactionHistoryService extends HouseKeeperServiceBase {
   }
   
   private static final class ObsoleteEntryReaper implements Runnable {
-    private final CompactionTxnHandler txnHandler;
+    private final TxnStore txnHandler;
     private final AtomicInteger isAliveCounter;
     private ObsoleteEntryReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
-      txnHandler = new CompactionTxnHandler(hiveConf);
+      txnHandler = TxnUtils.getTxnStore(hiveConf);
       this.isAliveCounter = isAliveCounter;
     }
     

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index 38151fb..de74a7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -20,15 +20,10 @@ package org.apache.hadoop.hive.ql.txn;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HouseKeeperService;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
-import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -58,10 +53,10 @@ public class AcidHouseKeeperService extends HouseKeeperServiceBase {
   }
 
   private static final class TimedoutTxnReaper implements Runnable {
-    private final TxnHandler txnHandler;
+    private final TxnStore txnHandler;
     private final AtomicInteger isAliveCounter;
     private TimedoutTxnReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
-      txnHandler = new TxnHandler(hiveConf);
+      txnHandler = TxnUtils.getTxnStore(hiveConf);
       this.isAliveCounter = isAliveCounter;
     }
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index c956f58..ae8865c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -50,7 +51,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   protected HiveConf conf;
-  protected CompactionTxnHandler txnHandler;
+  protected TxnStore txnHandler;
   protected RawStore rs;
   protected int threadId;
   protected AtomicBoolean stop;
@@ -75,7 +76,7 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
     setDaemon(true); // this means the process will exit without waiting for this thread
 
     // Get our own instance of the transaction handler
-    txnHandler = new CompactionTxnHandler(conf);
+    txnHandler = TxnUtils.getTxnStore(conf);
 
     // Get our own connection to the database so we can get table and partition information.
     rs = RawStoreProxy.getProxy(conf, conf,

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index c023c27..1898a4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -37,8 +37,8 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -81,7 +81,7 @@ public class Initiator extends CompactorThread {
         try {//todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
           ValidTxnList txns =
-              CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+              TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
           Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold);
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
@@ -184,7 +184,7 @@ public class Initiator extends CompactorThread {
                                             CompactionInfo ci) {
     if (compactions.getCompacts() != null) {
       for (ShowCompactResponseElement e : compactions.getCompacts()) {
-         if ((e.getState().equals(TxnHandler.WORKING_RESPONSE) || e.getState().equals(TxnHandler.INITIATED_RESPONSE)) &&
+         if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
             e.getDbname().equals(ci.dbname) &&
             e.getTablename().equals(ci.tableName) &&
             (e.getPartitionname() == null && ci.partName == null ||

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 59a765b..516b92e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,13 +27,15 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -135,7 +137,7 @@ public class Worker extends CompactorThread {
 
         final boolean isMajor = ci.isMajorCompaction();
         final ValidTxnList txns =
-            CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+            TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
         LOG.debug("ValidCompactTxnList: " + txns.writeToString());
         txnHandler.setCompactionHighestTxnId(ci, txns.getHighWatermark());
         final StringBuilder jobName = new StringBuilder(name);

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 44b77e7..6f8dc35 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService;
@@ -486,7 +486,7 @@ public class TestTxnCommands2 {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
 
     int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     AtomicBoolean stop = new AtomicBoolean(true);
     //create failed compactions
     for(int i = 0; i < numFailedCompactions; i++) {
@@ -556,27 +556,27 @@ public class TestTxnCommands2 {
     private int working;
     private int total;
   }
-  private static CompactionsByState countCompacts(TxnHandler txnHandler) throws MetaException {
+  private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     CompactionsByState compactionsByState = new CompactionsByState();
     compactionsByState.total = resp.getCompactsSize();
     for(ShowCompactResponseElement compact : resp.getCompacts()) {
-      if(TxnHandler.FAILED_RESPONSE.equals(compact.getState())) {
+      if(TxnStore.FAILED_RESPONSE.equals(compact.getState())) {
         compactionsByState.failed++;
       }
-      else if(TxnHandler.CLEANING_RESPONSE.equals(compact.getState())) {
+      else if(TxnStore.CLEANING_RESPONSE.equals(compact.getState())) {
         compactionsByState.readyToClean++;
       }
-      else if(TxnHandler.INITIATED_RESPONSE.equals(compact.getState())) {
+      else if(TxnStore.INITIATED_RESPONSE.equals(compact.getState())) {
         compactionsByState.initiated++;
       }
-      else if(TxnHandler.SUCCEEDED_RESPONSE.equals(compact.getState())) {
+      else if(TxnStore.SUCCEEDED_RESPONSE.equals(compact.getState())) {
         compactionsByState.succeeded++;
       }
-      else if(TxnHandler.WORKING_RESPONSE.equals(compact.getState())) {
+      else if(TxnStore.WORKING_RESPONSE.equals(compact.getState())) {
         compactionsByState.working++;
       }
-      else if(TxnHandler.ATTEMPTED_RESPONSE.equals(compact.getState())) {
+      else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
         compactionsByState.attempted++;
       }
     }
@@ -632,7 +632,7 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
 
     //run Worker to execute compaction
-    CompactionTxnHandler txnHandler = new CompactionTxnHandler(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
     Worker t = new Worker();
     t.setThreadId((int) t.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/c8295051/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index a4f7e5b..99705b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -42,7 +42,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -243,10 +247,10 @@ public class TestDbTxnManager {
     }
     expireLocks(txnMgr, 5);
     //create a lot of locks
-    for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+    for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
       ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
     }
-    expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+    expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
   }
   private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
     DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();