You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/11/06 21:46:41 UTC

svn commit: r1637215 - in /hive/trunk/metastore/src: java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java

Author: gates
Date: Thu Nov  6 20:46:41 2014
New Revision: 1637215

URL: http://svn.apache.org/r1637215
Log:
HIVE-8711 DB deadlocks not handled in TxnHandler for Postgres, Oracle, and SQLServer (Alan Gates, reviewed by Eugene Koifman)

Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Nov  6 20:46:41 2014
@@ -127,7 +127,7 @@ public class CompactionTxnHandler extend
          dbConn.rollback();
        } catch (SQLException e1) {
        }
-       detectDeadlock(e, "setRunAs");
+       detectDeadlock(dbConn, e, "setRunAs");
      } finally {
        closeDbConn(dbConn);
        closeStmt(stmt);
@@ -192,7 +192,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "findNextToCompact");
+        detectDeadlock(dbConn, e, "findNextToCompact");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -234,7 +234,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "markCompacted");
+        detectDeadlock(dbConn, e, "markCompacted");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -377,7 +377,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "markCleaned");
+        detectDeadlock(dbConn, e, "markCleaned");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -429,7 +429,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "cleanEmptyAbortedTxns");
+        detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -475,7 +475,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "revokeFromLocalWorkers");
+        detectDeadlock(dbConn, e, "revokeFromLocalWorkers");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -522,7 +522,7 @@ public class CompactionTxnHandler extend
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "revokeTimedoutWorkers");
+        detectDeadlock(dbConn, e, "revokeTimedoutWorkers");
         throw new MetaException("Unable to connect to transaction database " +
             StringUtils.stringifyException(e));
       } finally {

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Thu Nov  6 20:46:41 2014
@@ -65,13 +65,13 @@ public class TxnHandler {
   static final protected char TXN_OPEN = 'o';
 
   // Lock states
-  static final private char LOCK_ACQUIRED = 'a';
-  static final private char LOCK_WAITING = 'w';
+  static final protected char LOCK_ACQUIRED = 'a';
+  static final protected char LOCK_WAITING = 'w';
 
   // Lock types
-  static final private char LOCK_EXCLUSIVE = 'e';
-  static final private char LOCK_SHARED = 'r';
-  static final private char LOCK_SEMI_SHARED = 'w';
+  static final protected char LOCK_EXCLUSIVE = 'e';
+  static final protected char LOCK_SHARED = 'r';
+  static final protected char LOCK_SEMI_SHARED = 'w';
 
   static final private int ALLOWED_REPEATED_DEADLOCKS = 5;
   static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
@@ -301,7 +301,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "openTxns");
+        detectDeadlock(dbConn, e, "openTxns");
         throw new MetaException("Unable to select from transaction database "
           + StringUtils.stringifyException(e));
       } finally {
@@ -336,7 +336,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "abortTxn");
+        detectDeadlock(dbConn, e, "abortTxn");
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
@@ -393,7 +393,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "commitTxn");
+        detectDeadlock(dbConn, e, "commitTxn");
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
@@ -419,7 +419,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "lock");
+        detectDeadlock(dbConn, e, "lock");
         throw new MetaException("Unable to update transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -444,7 +444,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "lockNoWait");
+        detectDeadlock(dbConn, e, "lockNoWait");
         throw new MetaException("Unable to update transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -479,7 +479,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "checkLock");
+        detectDeadlock(dbConn, e, "checkLock");
         throw new MetaException("Unable to update transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -534,7 +534,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "unlock");
+        detectDeadlock(dbConn, e, "unlock");
         throw new MetaException("Unable to update transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -613,7 +613,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "heartbeat");
+        detectDeadlock(dbConn, e, "heartbeat");
         throw new MetaException("Unable to select from transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -652,7 +652,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "heartbeatTxnRange");
+        detectDeadlock(dbConn, e, "heartbeatTxnRange");
         throw new MetaException("Unable to select from transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -735,7 +735,7 @@ public class TxnHandler {
           dbConn.rollback();
         } catch (SQLException e1) {
         }
-        detectDeadlock(e, "compact");
+        detectDeadlock(dbConn, e, "compact");
         throw new MetaException("Unable to select from transaction database " +
             StringUtils.stringifyException(e));
       } finally {
@@ -898,15 +898,30 @@ public class TxnHandler {
    * Determine if an exception was a deadlock.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
    * different database.
+   * @param conn database connection
    * @param e exception that was thrown.
    * @param caller name of the method calling this
    * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock
    * detected and retry count has not been exceeded.
    */
-  protected void detectDeadlock(SQLException e, String caller) throws DeadlockException {
-    final String mysqlDeadlock =
-        "Deadlock found when trying to get lock; try restarting transaction";
-    if (e.getMessage().contains(mysqlDeadlock) || e instanceof SQLTransactionRollbackException) {
+  protected void detectDeadlock(Connection conn,
+                                SQLException e,
+                                String caller) throws DeadlockException, MetaException {
+
+    // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
+    // to test these changes.
+    // MySQL and MSSQL use 40001 as the state code for rollback.  Postgres uses 40001 and 40P01.
+    // Oracle seems to return different SQLStates each time, but the message always contains
+    // "deadlock detected", so I've used that instead.
+    // Derby and newer MySQL driver use the new SQLTransactionRollbackException
+    if (dbProduct == null) {
+      determineDatabaseProduct(conn);
+    }
+    if (e instanceof SQLTransactionRollbackException ||
+        ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
+            dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
+        (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
+        (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")))) {
       if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
         LOG.warn("Deadlock detected in " + caller + ", trying again.");
         throw new DeadlockException();

Modified: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Thu Nov  6 20:46:41 2014
@@ -20,13 +20,18 @@ package org.apache.hadoop.hive.metastore
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -1082,6 +1087,115 @@ public class TestTxnHandler {
     for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
   }
 
+  @Test
+  @Ignore
+  public void deadlockDetected() throws Exception {
+    Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+    Statement stmt = conn.createStatement();
+    long now = txnHandler.getDbTime(conn);
+    stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+        "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+        "'scooby.com')");
+    stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+        "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+        "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+        txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+        "'scooby.com')");
+    conn.commit();
+    txnHandler.closeDbConn(conn);
+
+    final MetaStoreThread.BooleanPointer sawDeadlock = new MetaStoreThread.BooleanPointer();
+
+    final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+    final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+    try {
+
+      for (int i = 0; i < 5; i++) {
+        Thread t1 = new Thread() {
+          @Override
+          public void run() {
+            try {
+              try {
+                updateTxns(conn1);
+                updateLocks(conn1);
+                Thread.sleep(1000);
+                conn1.commit();
+                LOG.debug("no exception, no deadlock");
+              } catch (SQLException e) {
+                try {
+                  txnHandler.detectDeadlock(conn1, e, "thread t1");
+                  LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                      e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                      " msg is <" + e.getMessage() + ">");
+                } catch (TxnHandler.DeadlockException de) {
+                  LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                      "exception is " + e.getClass().getName() + " msg is <" + e
+                      .getMessage() + ">");
+                  sawDeadlock.boolVal = true;
+                }
+              }
+              conn1.rollback();
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+
+        Thread t2 = new Thread() {
+          @Override
+          public void run() {
+            try {
+              try {
+                updateLocks(conn2);
+                updateTxns(conn2);
+                Thread.sleep(1000);
+                conn2.commit();
+                LOG.debug("no exception, no deadlock");
+              } catch (SQLException e) {
+                try {
+                  txnHandler.detectDeadlock(conn2, e, "thread t2");
+                  LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+                      e.getSQLState() + " class of exception is " + e.getClass().getName() +
+                      " msg is <" + e.getMessage() + ">");
+                } catch (TxnHandler.DeadlockException de) {
+                  LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+                      "exception is " + e.getClass().getName() + " msg is <" + e
+                      .getMessage() + ">");
+                  sawDeadlock.boolVal = true;
+                }
+              }
+              conn2.rollback();
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+        if (sawDeadlock.boolVal) break;
+      }
+      assertTrue(sawDeadlock.boolVal);
+    } finally {
+      conn1.rollback();
+      txnHandler.closeDbConn(conn1);
+      conn2.rollback();
+      txnHandler.closeDbConn(conn2);
+    }
+  }
+
+  private void updateTxns(Connection conn) throws SQLException {
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
+  }
+
+  private void updateLocks(Connection conn) throws SQLException {
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1");
+  }
+
   @Before
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();