You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/01/24 02:05:13 UTC
svn commit: r1654442 [1/2] - in /hive/branches/branch-0.14/metastore/src:
java/org/apache/hadoop/hive/metastore/
java/org/apache/hadoop/hive/metastore/txn/
test/org/apache/hadoop/hive/metastore/txn/
Author: ekoifman
Date: Sat Jan 24 01:05:12 2015
New Revision: 1654442
URL: http://svn.apache.org/r1654442
Log:
HIVE-9390 Enhance retry logic wrt DB access in TxnHandler
Modified:
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
hive/branches/branch-0.14/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1654442&r1=1654441&r2=1654442&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Sat Jan 24 01:05:12 2015
@@ -5318,126 +5318,74 @@ public class HiveMetaStore extends Thrif
// Transaction and locking methods
@Override
public GetOpenTxnsResponse get_open_txns() throws TException {
- try {
- return getTxnHandler().getOpenTxns();
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().getOpenTxns();
}
// Transaction and locking methods
@Override
public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
- try {
- return getTxnHandler().getOpenTxnsInfo();
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().getOpenTxnsInfo();
}
@Override
public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException {
- try {
- return getTxnHandler().openTxns(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().openTxns(rqst);
}
@Override
public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException, TException {
- try {
- getTxnHandler().abortTxn(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().abortTxn(rqst);
}
@Override
public void commit_txn(CommitTxnRequest rqst)
throws NoSuchTxnException, TxnAbortedException, TException {
- try {
- getTxnHandler().commitTxn(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().commitTxn(rqst);
}
@Override
public LockResponse lock(LockRequest rqst)
throws NoSuchTxnException, TxnAbortedException, TException {
- try {
- return getTxnHandler().lock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().lock(rqst);
}
@Override
public LockResponse check_lock(CheckLockRequest rqst)
throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException {
- try {
- return getTxnHandler().checkLock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().checkLock(rqst);
}
@Override
public void unlock(UnlockRequest rqst)
throws NoSuchLockException, TxnOpenException, TException {
- try {
- getTxnHandler().unlock(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().unlock(rqst);
}
@Override
public ShowLocksResponse show_locks(ShowLocksRequest rqst) throws TException {
- try {
- return getTxnHandler().showLocks(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().showLocks(rqst);
}
@Override
public void heartbeat(HeartbeatRequest ids)
throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException {
- try {
- getTxnHandler().heartbeat(ids);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().heartbeat(ids);
}
@Override
public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst)
throws TException {
- try {
- return getTxnHandler().heartbeatTxnRange(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().heartbeatTxnRange(rqst);
}
@Override
public void compact(CompactionRequest rqst) throws TException {
- try {
- getTxnHandler().compact(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ getTxnHandler().compact(rqst);
}
@Override
public ShowCompactResponse show_compact(ShowCompactRequest rqst) throws TException {
- try {
- return getTxnHandler().showCompact(rqst);
- } catch (MetaException e) {
- throw new TException(e);
- }
+ return getTxnHandler().showCompact(rqst);
}
@Override
Modified: hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654442&r1=1654441&r2=1654442&view=diff
==============================================================================
--- hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java (original)
+++ hive/branches/branch-0.14/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java Sat Jan 24 01:05:12 2015
@@ -52,51 +52,58 @@ public class CompactionTxnHandler extend
* or runAs set since these are only potential compactions not actual ones.
*/
public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
Set<CompactionInfo> response = new HashSet<CompactionInfo>();
Statement stmt = null;
try {
- stmt = dbConn.createStatement();
- // Check for completed transactions
- String s = "select distinct ctc_database, ctc_table, " +
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ // Check for completed transactions
+ String s = "select distinct ctc_database, ctc_table, " +
"ctc_partition from COMPLETED_TXN_COMPONENTS";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- response.add(info);
- }
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ response.add(info);
+ }
- // Check for aborted txns
- s = "select tc_database, tc_table, tc_partition " +
+ // Check for aborted txns
+ s = "select tc_database, tc_table, tc_partition " +
"from TXNS, TXN_COMPONENTS " +
"where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
"group by tc_database, tc_table, tc_partition " +
"having count(*) > " + maxAborted;
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- info.tooManyAborts = true;
- response.add(info);
- }
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ info.tooManyAborts = true;
+ response.add(info);
+ }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e) {
- LOG.error("Unable to connect to transaction database " + e.getMessage());
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database " + e.getMessage());
+ checkRetryable(dbConn, e, "findPotentialCompactions");
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ return response;
+ }
+ catch (RetryException e) {
+ return findPotentialCompactions(maxAborted);
}
- return response;
}
/**
@@ -107,35 +114,31 @@ public class CompactionTxnHandler extend
*/
public void setRunAs(long cq_id, String user) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
- stmt = dbConn.createStatement();
- String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
- LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) != 1) {
- LOG.error("Unable to update compaction record");
- LOG.debug("Going to rollback");
- dbConn.rollback();
- }
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to update compaction queue, " + e.getMessage());
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "setRunAs");
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
- }
- } catch (DeadlockException e) {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
+ LOG.debug("Going to execute update <" + s + ">");
+ if (stmt.executeUpdate(s) != 1) {
+ LOG.error("Unable to update compaction record");
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "setRunAs");
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ } catch (RetryException e) {
setRunAs(cq_id, user);
- } finally {
- deadlockCnt = 0;
}
}
@@ -147,14 +150,15 @@ public class CompactionTxnHandler extend
*/
public CompactionInfo findNextToCompact(String workerId) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
CompactionInfo info = new CompactionInfo();
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "select cq_id, cq_database, cq_table, cq_partition, " +
- "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+ "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
@@ -175,7 +179,7 @@ public class CompactionTxnHandler extend
// Now, update this record as being worked on by this worker.
long now = getDbTime(dbConn);
s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
- "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
+ "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
if (stmt.executeUpdate(s) != 1) {
LOG.error("Unable to update compaction record");
@@ -187,38 +191,34 @@ public class CompactionTxnHandler extend
return info;
} catch (SQLException e) {
LOG.error("Unable to select next element for compaction, " + e.getMessage());
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "findNextToCompact");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findNextToCompact");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return findNextToCompact(workerId);
- } finally {
- deadlockCnt = 0;
}
}
/**
* This will mark an entry in the queue as compacted
* and put it in the ready to clean state.
- * @param info info on the compaciton entry to mark as compacted.
+ * @param info info on the compaction entry to mark as compacted.
*/
public void markCompacted(CompactionInfo info) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
- "cq_worker_id = null where cq_id = " + info.id;
+ "cq_worker_id = null where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
if (stmt.executeUpdate(s) != 1) {
LOG.error("Unable to update compaction record");
@@ -228,23 +228,18 @@ public class CompactionTxnHandler extend
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.error("Unable to update compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "markCompacted");
+ LOG.error("Unable to update compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCompacted");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
markCompacted(info);
- } finally {
- deadlockCnt = 0;
}
}
@@ -254,45 +249,48 @@ public class CompactionTxnHandler extend
* @return information on the entry in the queue.
*/
public List<CompactionInfo> findReadyToClean() throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
Statement stmt = null;
try {
- stmt = dbConn.createStatement();
- String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_id, cq_database, cq_table, cq_partition, " +
"cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.id = rs.getLong(1);
- info.dbname = rs.getString(2);
- info.tableName = rs.getString(3);
- info.partName = rs.getString(4);
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
- case MINOR_TYPE: info.type = CompactionType.MINOR; break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+ case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+ default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ info.runAs = rs.getString(6);
+ rc.add(info);
}
- info.runAs = rs.getString(6);
- rc.add(info);
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- return rc;
- } catch (SQLException e) {
- LOG.error("Unable to select next element for cleaning, " + e.getMessage());
- try {
LOG.debug("Going to rollback");
dbConn.rollback();
- } catch (SQLException e1) {
- }
- throw new MetaException("Unable to connect to transaction database " +
+ return rc;
+ } catch (SQLException e) {
+ LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findReadyToClean");
+ throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
- } finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ } catch (RetryException e) {
+ return findReadyToClean();
}
}
@@ -303,9 +301,10 @@ public class CompactionTxnHandler extend
*/
public void markCleaned(CompactionInfo info) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
@@ -318,20 +317,20 @@ public class CompactionTxnHandler extend
// Remove entries from completed_txn_components as well, so we don't start looking there
// again.
s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " +
- "ctc_table = '" + info.tableName + "'";
+ "ctc_table = '" + info.tableName + "'";
if (info.partName != null) {
s += " and ctc_partition = '" + info.partName + "'";
}
LOG.debug("Going to execute update <" + s + ">");
if (stmt.executeUpdate(s) < 1) {
LOG.error("Expected to remove at least one row from completed_txn_components when " +
- "marking compaction entry as clean!");
+ "marking compaction entry as clean!");
}
s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
- TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
- info.tableName + "'";
+ TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
+ info.tableName + "'";
if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
LOG.debug("Going to execute update <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
@@ -371,23 +370,18 @@ public class CompactionTxnHandler extend
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "markCleaned");
+ LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCleaned");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
markCleaned(info);
- } finally {
- deadlockCnt = 0;
}
}
@@ -396,13 +390,14 @@ public class CompactionTxnHandler extend
*/
public void cleanEmptyAbortedTxns() throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "select txn_id from TXNS where " +
- "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
- "txn_state = '" + TXN_ABORTED + "'";
+ "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
+ "txn_state = '" + TXN_ABORTED + "'";
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
Set<Long> txnids = new HashSet<Long>();
@@ -425,21 +420,16 @@ public class CompactionTxnHandler extend
} catch (SQLException e) {
LOG.error("Unable to delete from txns table " + e.getMessage());
LOG.debug("Going to rollback");
- try {
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
cleanEmptyAbortedTxns();
- } finally {
- deadlockCnt = 0;
}
}
@@ -454,13 +444,14 @@ public class CompactionTxnHandler extend
*/
public void revokeFromLocalWorkers(String hostname) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
- + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
- + hostname + "%'";
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
+ + hostname + "%'";
LOG.debug("Going to execute update <" + s + ">");
// It isn't an error if the following returns no rows, as the local workers could have died
// with nothing assigned to them.
@@ -468,24 +459,19 @@ public class CompactionTxnHandler extend
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.error("Unable to change dead worker's records back to initiated state " +
- e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "revokeFromLocalWorkers");
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeFromLocalWorkers");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
revokeFromLocalWorkers(hostname);
- } finally {
- deadlockCnt = 0;
}
}
@@ -500,14 +486,15 @@ public class CompactionTxnHandler extend
*/
public void revokeTimedoutWorkers(long timeout) throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- long latestValidStart = getDbTime(dbConn) - timeout;
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ long latestValidStart = getDbTime(dbConn) - timeout;
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
- + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
- + latestValidStart;
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
+ + latestValidStart;
LOG.debug("Going to execute update <" + s + ">");
// It isn't an error if the following returns no rows, as the local workers could have died
// with nothing assigned to them.
@@ -515,24 +502,19 @@ public class CompactionTxnHandler extend
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.error("Unable to change dead worker's records back to initiated state " +
- e.getMessage());
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "revokeTimedoutWorkers");
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeTimedoutWorkers");
throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
closeStmt(stmt);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
revokeTimedoutWorkers(timeout);
- } finally {
- deadlockCnt = 0;
}
}
@@ -543,53 +525,55 @@ public class CompactionTxnHandler extend
* @throws MetaException
*/
public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
try {
- String quote = getIdentifierQuoteString(dbConn);
- stmt = dbConn.createStatement();
- StringBuilder bldr = new StringBuilder();
- bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String quote = getIdentifierQuoteString(dbConn);
+ stmt = dbConn.createStatement();
+ StringBuilder bldr = new StringBuilder();
+ bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
.append(" FROM ")
.append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
- .append(quote)
+ .append(quote)
.append(" WHERE ")
.append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname)
- .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
- .append(" = '").append(ci.tableName).append("'");
- if (ci.partName != null) {
- bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
+ .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
+ .append(" = '").append(ci.tableName).append("'");
+ if (ci.partName != null) {
+ bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
.append(ci.partName).append("'");
- }
- String s = bldr.toString();
+ }
+ String s = bldr.toString();
/*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
"PART_COL_STATS")
+ " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+ (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
- LOG.debug("Going to execute <" + s + ">");
- rs = stmt.executeQuery(s);
- List<String> columns = new ArrayList<String>();
- while(rs.next()) {
- columns.add(rs.getString(1));
- }
- LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
- (ci.partName == null ? "" : "/" + ci.partName));
- dbConn.commit();
- return columns;
- } catch (SQLException e) {
- try {
+ LOG.debug("Going to execute <" + s + ">");
+ rs = stmt.executeQuery(s);
+ List<String> columns = new ArrayList<String>();
+ while (rs.next()) {
+ columns.add(rs.getString(1));
+ }
+ LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName));
+ dbConn.commit();
+ return columns;
+ } catch (SQLException e) {
LOG.error("Failed to find columns to analyze stats on for " + ci.tableName +
- (ci.partName == null ? "" : "/" + ci.partName), e);
- dbConn.rollback();
- } catch (SQLException e1) {
- //nothing we can do here
+ (ci.partName == null ? "" : "/" + ci.partName), e);
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findColumnsWithStats");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
}
- throw new MetaException("Unable to connect to transaction database " +
- StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
+ } catch (RetryException ex) {
+ return findColumnsWithStats(ci);
}
}
}