You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/06/20 19:33:32 UTC

svn commit: r1604222 - in /hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn: CompactionTxnHandler.java TxnHandler.java

Author: hashutosh
Date: Fri Jun 20 17:33:31 2014
New Revision: 1604222

URL: http://svn.apache.org/r1604222
Log:
HIVE-6967 : Hive transaction manager fails when SQLServer is used as an RDBMS (Alan Gates via Ashutosh Chauhan)

Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1604222&r1=1604221&r2=1604222&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Fri Jun 20 17:33:31 2014
@@ -52,7 +52,7 @@ public class CompactionTxnHandler extend
    * or runAs set since these are only potential compactions not actual ones.
    */
   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     Set<CompactionInfo> response = new HashSet<CompactionInfo>();
     try {
       Statement stmt = dbConn.createStatement();
@@ -105,7 +105,7 @@ public class CompactionTxnHandler extend
    */
   public void setRunAs(long cq_id, String user) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
        Statement stmt = dbConn.createStatement();
        String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
@@ -143,13 +143,13 @@ public class CompactionTxnHandler extend
    */
   public CompactionInfo findNextToCompact(String workerId) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       CompactionInfo info = new CompactionInfo();
 
       try {
         Statement stmt = dbConn.createStatement();
         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
-            "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "' for update";
+            "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
         LOG.debug("Going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -207,7 +207,7 @@ public class CompactionTxnHandler extend
    */
   public void markCompacted(CompactionInfo info) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
@@ -246,7 +246,7 @@ public class CompactionTxnHandler extend
    * @return information on the entry in the queue.
    */
   public List<CompactionInfo> findReadyToClean() throws MetaException {
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
 
     try {
@@ -293,7 +293,7 @@ public class CompactionTxnHandler extend
    */
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
@@ -384,7 +384,7 @@ public class CompactionTxnHandler extend
    */
   public void cleanEmptyAbortedTxns() throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
         String s = "select txn_id from TXNS where " +
@@ -440,7 +440,7 @@ public class CompactionTxnHandler extend
    */
   public void revokeFromLocalWorkers(String hostname) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
@@ -484,7 +484,7 @@ public class CompactionTxnHandler extend
    */
   public void revokeTimedoutWorkers(long timeout) throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       long latestValidStart = getDbTime(dbConn) - timeout;
       try {
         Statement stmt = dbConn.createStatement();

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1604222&r1=1604221&r2=1604222&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Fri Jun 20 17:33:31 2014
@@ -66,8 +66,8 @@ public class TxnHandler {
   static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
 
   static private BoneCP connPool;
-  private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock
-  // method
+  private static final Boolean lockLock = new Boolean("true"); // Random object to lock on for the
+  // lock method
 
   /**
    * Number of consecutive deadlocks we have seen
@@ -119,13 +119,12 @@ public class TxnHandler {
     // open transactions.  To avoid needing a transaction on the underlying
     // database we'll look at the current transaction number first.  If it
     // subsequently shows up in the open list that's ok.
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     try {
       Statement stmt = dbConn.createStatement();
-      LOG.debug("Going to execute query <select ntxn_next - 1 from " +
-          "NEXT_TXN_ID>");
-      ResultSet rs =
-          stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+      String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
       if (!rs.next()) {
         throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
@@ -137,8 +136,9 @@ public class TxnHandler {
       }
 
       List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
-      LOG.debug("Going to execute query<select txn_id, txn_state from TXNS>");
-      rs = stmt.executeQuery("select txn_id, txn_state, txn_user, txn_host from TXNS");
+      s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
+      LOG.debug("Going to execute query<" + s + ">");
+      rs = stmt.executeQuery(s);
       while (rs.next()) {
         char c = rs.getString(2).charAt(0);
         TxnState state;
@@ -179,14 +179,13 @@ public class TxnHandler {
     // open transactions.  To avoid needing a transaction on the underlying
     // database we'll look at the current transaction number first.  If it
     // subsequently shows up in the open list that's ok.
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     try {
       timeOutTxns(dbConn);
       Statement stmt = dbConn.createStatement();
-      LOG.debug("Going to execute query <select ntxn_next - 1 from " +
-          "NEXT_TXN_ID>");
-      ResultSet rs =
-          stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+      String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+      LOG.debug("Going to execute query <" + s + ">");
+      ResultSet rs = stmt.executeQuery(s);
       if (!rs.next()) {
         throw new MetaException("Transaction tables not properly " +
             "initialized, no record found in next_txn_id");
@@ -198,8 +197,9 @@ public class TxnHandler {
       }
 
       Set<Long> openList = new HashSet<Long>();
-      LOG.debug("Going to execute query<select txn_id from TXNS>");
-      rs = stmt.executeQuery("select txn_id from TXNS");
+      s = "select txn_id from TXNS";
+      LOG.debug("Going to execute query<" + s + ">");
+      rs = stmt.executeQuery(s);
       while (rs.next()) {
         openList.add(rs.getLong(1));
       }
@@ -234,7 +234,7 @@ public class TxnHandler {
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
     int numTxns = rqst.getNum_txns();
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         // Make sure the user has not requested an insane amount of txns.
         int maxTxns = HiveConf.getIntVar(conf,
@@ -242,16 +242,15 @@ public class TxnHandler {
         if (numTxns > maxTxns) numTxns = maxTxns;
 
         Statement stmt = dbConn.createStatement();
-        LOG.debug("Going to execute query <select ntxn_next from NEXT_TXN_ID " +
-            " for update>");
-        ResultSet rs =
-            stmt.executeQuery("select ntxn_next from NEXT_TXN_ID for update");
+        String s = "select ntxn_next from NEXT_TXN_ID";
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
           throw new MetaException("Transaction database not properly " +
               "configured, can't find next transaction id.");
         }
         long first = rs.getLong(1);
-        String s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+        s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
         long now = getDbTime(dbConn);
@@ -292,7 +291,7 @@ public class TxnHandler {
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
     long txnid = rqst.getTxnid();
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         List<Long> txnids = new ArrayList<Long>(1);
         txnids.add(txnid);
@@ -327,7 +326,7 @@ public class TxnHandler {
       throws NoSuchTxnException, TxnAbortedException,  MetaException {
     long txnid = rqst.getTxnid();
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
         // Before we do the commit heartbeat the txn.  This is slightly odd in that we're going to
@@ -382,7 +381,7 @@ public class TxnHandler {
   public LockResponse lock(LockRequest rqst)
       throws NoSuchTxnException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         return lock(dbConn, rqst, true);
       } catch (SQLException e) {
@@ -407,7 +406,7 @@ public class TxnHandler {
   public LockResponse lockNoWait(LockRequest rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         return lock(dbConn, rqst, false);
       } catch (SQLException e) {
@@ -432,7 +431,7 @@ public class TxnHandler {
   public LockResponse checkLock(CheckLockRequest rqst)
       throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         long extLockId = rqst.getLockid();
         // Clean up timed out locks
@@ -468,7 +467,7 @@ public class TxnHandler {
   public void unlock(UnlockRequest rqst)
       throws NoSuchLockException, TxnOpenException, MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         // Odd as it seems, we need to heartbeat first because this touches the
         // lock table and assures that our locks our still valid.  If they are
@@ -519,7 +518,7 @@ public class TxnHandler {
   }
 
   public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     ShowLocksResponse rsp = new ShowLocksResponse();
     List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
     try {
@@ -571,7 +570,7 @@ public class TxnHandler {
   public void heartbeat(HeartbeatRequest ids)
       throws NoSuchTxnException,  NoSuchLockException, TxnAbortedException, MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         heartbeatLock(dbConn, ids.getLockid());
         heartbeatTxn(dbConn, ids.getTxnid());
@@ -597,7 +596,7 @@ public class TxnHandler {
   public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
       throws MetaException {
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
       Set<Long> nosuch = new HashSet<Long>();
       Set<Long> aborted = new HashSet<Long>();
@@ -634,12 +633,12 @@ public class TxnHandler {
   public void compact(CompactionRequest rqst) throws MetaException {
     // Put a compaction request in the queue.
     try {
-      Connection dbConn = getDbConn();
+      Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
       try {
         Statement stmt = dbConn.createStatement();
 
         // Get the id for the next entry in the queue
-        String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID for update";
+        String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
         LOG.debug("going to execute query <" + s + ">");
         ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
@@ -717,7 +716,7 @@ public class TxnHandler {
 
   public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
     ShowCompactResponse response = new ShowCompactResponse();
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     try {
       Statement stmt = dbConn.createStatement();
       String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
@@ -765,7 +764,7 @@ public class TxnHandler {
    * For testing only, do not use.
    */
   int numLocksInLockTable() throws SQLException, MetaException {
-    Connection dbConn = getDbConn();
+    Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
     try {
       Statement stmt = dbConn.createStatement();
       String s = "select count(*) from HIVE_LOCKS";
@@ -794,11 +793,18 @@ public class TxnHandler {
 
   }
 
-  protected Connection getDbConn() throws MetaException {
+  /**
+   * Get a connection to the database
+   * @param isolationLevel desired isolation level.  If you are doing _any_ data modifications
+   *                       you should request serializable, else read committed should be fine.
+   * @return db connection
+   * @throws MetaException if the connection cannot be obtained
+   */
+  protected Connection getDbConn(int isolationLevel) throws MetaException {
     try {
       Connection dbConn = connPool.getConnection();
       dbConn.setAutoCommit(false);
-      dbConn.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+      dbConn.setTransactionIsolation(isolationLevel);
       return dbConn;
     } catch (SQLException e) {
       String msg = "Unable to get jdbc connection from pool, " + e.getMessage();
@@ -999,7 +1005,7 @@ public class TxnHandler {
     }
   }
 
-  private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING};
+  private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING}
 
   // A jump table to figure out whether to wait, acquire,
   // or keep looking .  Since
@@ -1100,12 +1106,10 @@ public class TxnHandler {
       try {
         Statement stmt = dbConn.createStatement();
 
-        // Get the next lock id.  We have to do this as select for update so no
-        // one else reads it and updates it under us.
-        LOG.debug("Going to execute query <select nl_next from NEXT_LOCK_ID " +
-            "for update>");
-        ResultSet rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID " +
-            "for update");
+        // Get the next lock id.
+        String s = "select nl_next from NEXT_LOCK_ID";
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = stmt.executeQuery(s);
         if (!rs.next()) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
@@ -1113,7 +1117,7 @@ public class TxnHandler {
               "initialized, no record found in next_lock_id");
         }
         long extLockId = rs.getLong(1);
-        String s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+        s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
         LOG.debug("Going to commit.");
@@ -1261,7 +1265,6 @@ public class TxnHandler {
         query.append("))");
       }
     }
-    query.append(" for update");
 
     LOG.debug("Going to execute query <" + query.toString() + ">");
     ResultSet rs = stmt.executeQuery(query.toString());
@@ -1360,7 +1363,7 @@ public class TxnHandler {
   }
 
   private void wait(Connection dbConn, Savepoint save) throws SQLException {
-    // Need to rollback because we did a select for update but we didn't
+    // Need to rollback because we did a select that acquired locks but we didn't
     // actually update anything.  Also, we may have locked some locks as
     // acquired that we now want to not acquire.  It's ok to rollback because
     // once we see one wait, we're done, we won't look for more.
@@ -1418,8 +1421,7 @@ public class TxnHandler {
     Statement stmt = dbConn.createStatement();
     long now = getDbTime(dbConn);
     // We need to check whether this transaction is valid and open
-    String s = "select txn_state from TXNS where txn_id = " +
-        txnid + " for update";
+    String s = "select txn_state from TXNS where txn_id = " + txnid;
     LOG.debug("Going to execute query <" + s + ">");
     ResultSet rs = stmt.executeQuery(s);
     if (!rs.next()) {