You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/11/06 21:46:41 UTC
svn commit: r1637215 - in /hive/trunk/metastore/src:
java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
Author: gates
Date: Thu Nov 6 20:46:41 2014
New Revision: 1637215
URL: http://svn.apache.org/r1637215
Log:
HIVE-8711 DB deadlocks not handled in TxnHandler for Postgres, Oracle, and SQLServer (Alan Gates, reviewed by Eugene Koifman)
Modified:
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Thu Nov 6 20:46:41 2014
@@ -127,7 +127,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "setRunAs");
+ detectDeadlock(dbConn, e, "setRunAs");
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
@@ -192,7 +192,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "findNextToCompact");
+ detectDeadlock(dbConn, e, "findNextToCompact");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -234,7 +234,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "markCompacted");
+ detectDeadlock(dbConn, e, "markCompacted");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -377,7 +377,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "markCleaned");
+ detectDeadlock(dbConn, e, "markCleaned");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -429,7 +429,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "cleanEmptyAbortedTxns");
+ detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -475,7 +475,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "revokeFromLocalWorkers");
+ detectDeadlock(dbConn, e, "revokeFromLocalWorkers");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -522,7 +522,7 @@ public class CompactionTxnHandler extend
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "revokeTimedoutWorkers");
+ detectDeadlock(dbConn, e, "revokeTimedoutWorkers");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Thu Nov 6 20:46:41 2014
@@ -65,13 +65,13 @@ public class TxnHandler {
static final protected char TXN_OPEN = 'o';
// Lock states
- static final private char LOCK_ACQUIRED = 'a';
- static final private char LOCK_WAITING = 'w';
+ static final protected char LOCK_ACQUIRED = 'a';
+ static final protected char LOCK_WAITING = 'w';
// Lock types
- static final private char LOCK_EXCLUSIVE = 'e';
- static final private char LOCK_SHARED = 'r';
- static final private char LOCK_SEMI_SHARED = 'w';
+ static final protected char LOCK_EXCLUSIVE = 'e';
+ static final protected char LOCK_SHARED = 'r';
+ static final protected char LOCK_SEMI_SHARED = 'w';
static final private int ALLOWED_REPEATED_DEADLOCKS = 5;
static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
@@ -301,7 +301,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "openTxns");
+ detectDeadlock(dbConn, e, "openTxns");
throw new MetaException("Unable to select from transaction database "
+ StringUtils.stringifyException(e));
} finally {
@@ -336,7 +336,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "abortTxn");
+ detectDeadlock(dbConn, e, "abortTxn");
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
@@ -393,7 +393,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "commitTxn");
+ detectDeadlock(dbConn, e, "commitTxn");
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
@@ -419,7 +419,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "lock");
+ detectDeadlock(dbConn, e, "lock");
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -444,7 +444,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "lockNoWait");
+ detectDeadlock(dbConn, e, "lockNoWait");
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -479,7 +479,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "checkLock");
+ detectDeadlock(dbConn, e, "checkLock");
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -534,7 +534,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "unlock");
+ detectDeadlock(dbConn, e, "unlock");
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -613,7 +613,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "heartbeat");
+ detectDeadlock(dbConn, e, "heartbeat");
throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -652,7 +652,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "heartbeatTxnRange");
+ detectDeadlock(dbConn, e, "heartbeatTxnRange");
throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -735,7 +735,7 @@ public class TxnHandler {
dbConn.rollback();
} catch (SQLException e1) {
}
- detectDeadlock(e, "compact");
+ detectDeadlock(dbConn, e, "compact");
throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
} finally {
@@ -898,15 +898,30 @@ public class TxnHandler {
* Determine if an exception was a deadlock. Unfortunately there is no standard way to do
* this, so we have to inspect the error messages and catch the telltale signs for each
* different database.
+ * @param conn database connection
* @param e exception that was thrown.
* @param caller name of the method calling this
* @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock
* detected and retry count has not been exceeded.
*/
- protected void detectDeadlock(SQLException e, String caller) throws DeadlockException {
- final String mysqlDeadlock =
- "Deadlock found when trying to get lock; try restarting transaction";
- if (e.getMessage().contains(mysqlDeadlock) || e instanceof SQLTransactionRollbackException) {
+ protected void detectDeadlock(Connection conn,
+ SQLException e,
+ String caller) throws DeadlockException, MetaException {
+
+ // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
+ // to test these changes.
+ // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01.
+ // Oracle seems to return different SQLStates each time, but the message always contains
+ // "deadlock detected", so I've used that instead.
+ // Derby and newer MySQL driver use the new SQLTransactionRollbackException
+ if (dbProduct == null) {
+ determineDatabaseProduct(conn);
+ }
+ if (e instanceof SQLTransactionRollbackException ||
+ ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
+ dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
+ (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
+ (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")))) {
if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
LOG.warn("Deadlock detected in " + caller + ", trying again.");
throw new DeadlockException();
Modified: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1637215&r1=1637214&r2=1637215&view=diff
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Thu Nov 6 20:46:41 2014
@@ -20,13 +20,18 @@ package org.apache.hadoop.hive.metastore
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -1082,6 +1087,115 @@ public class TestTxnHandler {
for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
}
+ @Test
+ @Ignore
+ public void deadlockDetected() throws Exception {
+ Connection conn = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Statement stmt = conn.createStatement();
+ long now = txnHandler.getDbTime(conn);
+ stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
+ "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
+ "'scooby.com')");
+ stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
+ "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
+ "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
+ txnHandler.LOCK_WAITING + "', '" + txnHandler.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
+ "'scooby.com')");
+ conn.commit();
+ txnHandler.closeDbConn(conn);
+
+ final MetaStoreThread.BooleanPointer sawDeadlock = new MetaStoreThread.BooleanPointer();
+
+ final Connection conn1 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ final Connection conn2 = txnHandler.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ try {
+
+ for (int i = 0; i < 5; i++) {
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ try {
+ updateTxns(conn1);
+ updateLocks(conn1);
+ Thread.sleep(1000);
+ conn1.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ txnHandler.detectDeadlock(conn1, e, "thread t1");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.DeadlockException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.boolVal = true;
+ }
+ }
+ conn1.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ try {
+ updateLocks(conn2);
+ updateTxns(conn2);
+ Thread.sleep(1000);
+ conn2.commit();
+ LOG.debug("no exception, no deadlock");
+ } catch (SQLException e) {
+ try {
+ txnHandler.detectDeadlock(conn2, e, "thread t2");
+ LOG.debug("Got an exception, but not a deadlock, SQLState is " +
+ e.getSQLState() + " class of exception is " + e.getClass().getName() +
+ " msg is <" + e.getMessage() + ">");
+ } catch (TxnHandler.DeadlockException de) {
+ LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
+ "exception is " + e.getClass().getName() + " msg is <" + e
+ .getMessage() + ">");
+ sawDeadlock.boolVal = true;
+ }
+ }
+ conn2.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ if (sawDeadlock.boolVal) break;
+ }
+ assertTrue(sawDeadlock.boolVal);
+ } finally {
+ conn1.rollback();
+ txnHandler.closeDbConn(conn1);
+ conn2.rollback();
+ txnHandler.closeDbConn(conn2);
+ }
+ }
+
+ private void updateTxns(Connection conn) throws SQLException {
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
+ }
+
+ private void updateLocks(Connection conn) throws SQLException {
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1");
+ }
+
@Before
public void setUp() throws Exception {
TxnDbUtil.prepDb();