You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/23 20:59:24 UTC
svn commit: r1654355 [10/27] - in /hive/branches/llap: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/con...
Modified: hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Fri Jan 23 19:59:11 2015
@@ -77,7 +77,7 @@ public class TxnHandler {
static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName());
static private DataSource connPool;
- private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock
+ private final static Object lockLock = new Object(); // Random object to lock on for the lock
// method
/**
@@ -87,10 +87,13 @@ public class TxnHandler {
protected HiveConf conf;
protected DatabaseProduct dbProduct;
- // Transaction timeout, in milliseconds.
+ // (End user) Transaction timeout, in milliseconds.
private long timeout;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
+ private final long retryInterval;
+ private final int retryLimit;
+ private int retryNum;
// DEADLOCK DETECTION AND HANDLING
// A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock
@@ -125,113 +128,122 @@ public class TxnHandler {
timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
deadlockCnt = 0;
buildJumpTable();
+ retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS);
+ retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
+
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
- // We need to figure out the current transaction number and the list of
- // open transactions. To avoid needing a transaction on the underlying
- // database we'll look at the current transaction number first. If it
- // subsequently shows up in the open list that's ok.
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- Statement stmt = null;
try {
- stmt = dbConn.createStatement();
- String s = "select ntxn_next - 1 from NEXT_TXN_ID";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly " +
+ // We need to figure out the current transaction number and the list of
+ // open transactions. To avoid needing a transaction on the underlying
+ // database we'll look at the current transaction number first. If it
+ // subsequently shows up in the open list that's ok.
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
- }
- long hwm = rs.getLong(1);
- if (rs.wasNull()) {
- throw new MetaException("Transaction tables not properly " +
+ }
+ long hwm = rs.getLong(1);
+ if (rs.wasNull()) {
+ throw new MetaException("Transaction tables not properly " +
"initialized, null record found in next_txn_id");
- }
-
- List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
- s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
- while (rs.next()) {
- char c = rs.getString(2).charAt(0);
- TxnState state;
- switch (c) {
- case TXN_ABORTED:
- state = TxnState.ABORTED;
- break;
+ }
- case TXN_OPEN:
- state = TxnState.OPEN;
- break;
+ List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
+ s = "select txn_id, txn_state, txn_user, txn_host from TXNS";
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ char c = rs.getString(2).charAt(0);
+ TxnState state;
+ switch (c) {
+ case TXN_ABORTED:
+ state = TxnState.ABORTED;
+ break;
+
+ case TXN_OPEN:
+ state = TxnState.OPEN;
+ break;
- default:
- throw new MetaException("Unexpected transaction state " + c +
+ default:
+ throw new MetaException("Unexpected transaction state " + c +
" found in txns table");
+ }
+ txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
}
- txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- return new GetOpenTxnsInfoResponse(hwm, txnInfo);
- } catch (SQLException e) {
- try {
LOG.debug("Going to rollback");
dbConn.rollback();
- } catch (SQLException e1) {
- }
- throw new MetaException("Unable to select from transaction database, "
+ return new GetOpenTxnsInfoResponse(hwm, txnInfo);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getOpenTxnsInfo");
+ throw new MetaException("Unable to select from transaction database: " + getMessage(e)
+ StringUtils.stringifyException(e));
- } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ return getOpenTxnsInfo();
}
}
public GetOpenTxnsResponse getOpenTxns() throws MetaException {
- // We need to figure out the current transaction number and the list of
- // open transactions. To avoid needing a transaction on the underlying
- // database we'll look at the current transaction number first. If it
- // subsequently shows up in the open list that's ok.
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- Statement stmt = null;
try {
- timeOutTxns(dbConn);
- stmt = dbConn.createStatement();
- String s = "select ntxn_next - 1 from NEXT_TXN_ID";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly " +
+ // We need to figure out the current transaction number and the list of
+ // open transactions. To avoid needing a transaction on the underlying
+ // database we'll look at the current transaction number first. If it
+ // subsequently shows up in the open list that's ok.
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ timeOutTxns(dbConn);
+ stmt = dbConn.createStatement();
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
- }
- long hwm = rs.getLong(1);
- if (rs.wasNull()) {
- throw new MetaException("Transaction tables not properly " +
+ }
+ long hwm = rs.getLong(1);
+ if (rs.wasNull()) {
+ throw new MetaException("Transaction tables not properly " +
"initialized, null record found in next_txn_id");
- }
+ }
- Set<Long> openList = new HashSet<Long>();
- s = "select txn_id from TXNS";
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
- while (rs.next()) {
- openList.add(rs.getLong(1));
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- return new GetOpenTxnsResponse(hwm, openList);
- } catch (SQLException e) {
- try {
+ Set<Long> openList = new HashSet<Long>();
+ s = "select txn_id from TXNS";
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ openList.add(rs.getLong(1));
+ }
LOG.debug("Going to rollback");
dbConn.rollback();
- } catch (SQLException e1) {
- }
- throw new MetaException("Unable to select from transaction database, "
+ return new GetOpenTxnsResponse(hwm, openList);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getOpenTxns");
+ throw new MetaException("Unable to select from transaction database, "
+ StringUtils.stringifyException(e));
- } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ return getOpenTxns();
}
}
@@ -259,12 +271,13 @@ public class TxnHandler {
public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
int numTxns = rqst.getNum_txns();
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
// Make sure the user has not requested an insane amount of txns.
int maxTxns = HiveConf.getIntVar(conf,
- HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
+ HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
if (numTxns > maxTxns) numTxns = maxTxns;
stmt = dbConn.createStatement();
@@ -273,7 +286,7 @@ public class TxnHandler {
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction database not properly " +
- "configured, can't find next transaction id.");
+ "configured, can't find next transaction id.");
}
long first = rs.getLong(1);
s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
@@ -281,8 +294,8 @@ public class TxnHandler {
stmt.executeUpdate(s);
long now = getDbTime(dbConn);
s = "insert into TXNS (txn_id, txn_state, txn_started, " +
- "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
- now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
+ "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " +
+ now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')";
LOG.debug("Going to prepare statement <" + s + ">");
PreparedStatement ps = dbConn.prepareStatement(s);
List<Long> txnIds = new ArrayList<Long>(numTxns);
@@ -296,30 +309,26 @@ public class TxnHandler {
dbConn.commit();
return new OpenTxnsResponse(txnIds);
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "openTxns");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "openTxns");
throw new MetaException("Unable to select from transaction database "
+ StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return openTxns(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
long txnid = rqst.getTxnid();
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
List<Long> txnids = new ArrayList<Long>(1);
txnids.add(txnid);
if (abortTxns(dbConn, txnids) != 1) {
@@ -331,31 +340,27 @@ public class TxnHandler {
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "abortTxn");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "abortTxn");
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
abortTxn(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public void commitTxn(CommitTxnRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException {
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
long txnid = rqst.getTxnid();
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
// Before we do the commit heartbeat the txn. This is slightly odd in that we're going to
// commit it, but it does two things. One, it makes sure the transaction is still valid.
@@ -367,11 +372,11 @@ public class TxnHandler {
// Move the record from txn_components into completed_txn_components so that the compactor
// knows where to look to compact.
String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
- "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
+ "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
if (stmt.executeUpdate(s) < 1) {
LOG.warn("Expected to move at least one record from txn_components to " +
- "completed_txn_components when committing txn!");
+ "completed_txn_components when committing txn!");
}
// Always access TXN_COMPONENTS before HIVE_LOCKS;
@@ -388,80 +393,68 @@ public class TxnHandler {
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "commitTxn");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "commitTxn");
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
commitTxn(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public LockResponse lock(LockRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException {
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
return lock(dbConn, rqst, true);
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "lock");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "lock");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return lock(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public LockResponse lockNoWait(LockRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException {
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
return lock(dbConn, rqst, false);
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "lockNoWait");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "lockNoWait");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return lockNoWait(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public LockResponse checkLock(CheckLockRequest rqst)
- throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
long extLockId = rqst.getLockid();
// Clean up timed out locks
timeOutLocks(dbConn);
@@ -474,31 +467,27 @@ public class TxnHandler {
if (txnid > 0) heartbeatTxn(dbConn, txnid);
return checkLock(dbConn, extLockId, true);
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "checkLock");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "checkLock");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return checkLock(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public void unlock(UnlockRequest rqst)
- throws NoSuchLockException, TxnOpenException, MetaException {
+ throws NoSuchLockException, TxnOpenException, MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
// Odd as it seems, we need to heartbeat first because this touches the
// lock table and assures that our locks our still valid. If they are
// not, this will throw an exception and the heartbeat will fail.
@@ -512,8 +501,8 @@ public class TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
String msg = "Unlocking locks associated with transaction" +
- " not permitted. Lockid " + extLockId + " is associated with " +
- "transaction " + txnid;
+ " not permitted. Lockid " + extLockId + " is associated with " +
+ "transaction " + txnid;
LOG.error(msg);
throw new TxnOpenException(msg);
}
@@ -529,97 +518,96 @@ public class TxnHandler {
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "unlock");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "unlock");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
unlock(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- ShowLocksResponse rsp = new ShowLocksResponse();
- List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
- Statement stmt = null;
try {
- stmt = dbConn.createStatement();
+ Connection dbConn = null;
+ ShowLocksResponse rsp = new ShowLocksResponse();
+ List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
- String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
+ String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
"hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS";
- LOG.debug("Doing to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- ShowLocksResponseElement e = new ShowLocksResponseElement();
- e.setLockid(rs.getLong(1));
- long txnid = rs.getLong(2);
- if (!rs.wasNull()) e.setTxnid(txnid);
- e.setDbname(rs.getString(3));
- e.setTablename(rs.getString(4));
- String partition = rs.getString(5);
- if (partition != null) e.setPartname(partition);
- switch (rs.getString(6).charAt(0)) {
- case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
- case LOCK_WAITING: e.setState(LockState.WAITING); break;
- default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
- }
- switch (rs.getString(7).charAt(0)) {
- case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
- case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
- case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
- default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
- }
- e.setLastheartbeat(rs.getLong(8));
- long acquiredAt = rs.getLong(9);
- if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
- e.setUser(rs.getString(10));
- e.setHostname(rs.getString(11));
- elems.add(e);
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e) {
- throw new MetaException("Unable to select from transaction database " +
+ LOG.debug("Doing to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ ShowLocksResponseElement e = new ShowLocksResponseElement();
+ e.setLockid(rs.getLong(1));
+ long txnid = rs.getLong(2);
+ if (!rs.wasNull()) e.setTxnid(txnid);
+ e.setDbname(rs.getString(3));
+ e.setTablename(rs.getString(4));
+ String partition = rs.getString(5);
+ if (partition != null) e.setPartname(partition);
+ switch (rs.getString(6).charAt(0)) {
+ case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
+ case LOCK_WAITING: e.setState(LockState.WAITING); break;
+ default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
+ }
+ switch (rs.getString(7).charAt(0)) {
+ case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
+ case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
+ case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
+ default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
+ }
+ e.setLastheartbeat(rs.getLong(8));
+ long acquiredAt = rs.getLong(9);
+ if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
+ e.setUser(rs.getString(10));
+ e.setHostname(rs.getString(11));
+ elems.add(e);
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ checkRetryable(dbConn, e, "showLocks");
+ throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
- } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ rsp.setLocks(elems);
+ return rsp;
+ } catch (RetryException e) {
+ return showLocks(rqst);
}
- rsp.setLocks(elems);
- return rsp;
}
public void heartbeat(HeartbeatRequest ids)
- throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
heartbeatLock(dbConn, ids.getLockid());
heartbeatTxn(dbConn, ids.getTxnid());
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "heartbeat");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "heartbeat");
throw new MetaException("Unable to select from transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
heartbeat(ids);
} finally {
deadlockCnt = 0;
@@ -627,15 +615,16 @@ public class TxnHandler {
}
public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
- throws MetaException {
+ throws MetaException {
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
Set<Long> nosuch = new HashSet<Long>();
Set<Long> aborted = new HashSet<Long>();
rsp.setNosuch(nosuch);
rsp.setAborted(aborted);
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
try {
heartbeatTxn(dbConn, txn);
@@ -647,18 +636,15 @@ public class TxnHandler {
}
return rsp;
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "heartbeatTxnRange");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "heartbeatTxnRange");
throw new MetaException("Unable to select from transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
return heartbeatTxnRange(rqst);
}
}
@@ -666,9 +652,10 @@ public class TxnHandler {
public void compact(CompactionRequest rqst) throws MetaException {
// Put a compaction request in the queue.
try {
- Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ Connection dbConn = null;
Statement stmt = null;
try {
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
// Get the id for the next entry in the queue
@@ -679,7 +666,7 @@ public class TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new MetaException("Transaction tables not properly initiated, " +
- "no record found in next_compaction_queue_id");
+ "no record found in next_compaction_queue_id");
}
long id = rs.getLong(1);
s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
@@ -687,7 +674,7 @@ public class TxnHandler {
stmt.executeUpdate(s);
StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
- "cq_table, ");
+ "cq_table, ");
String partName = rqst.getPartitionname();
if (partName != null) buf.append("cq_partition, ");
buf.append("cq_state, cq_type");
@@ -730,71 +717,69 @@ public class TxnHandler {
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
- try {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e1) {
- }
- detectDeadlock(dbConn, e, "compact");
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "compact");
throw new MetaException("Unable to select from transaction database " +
- StringUtils.stringifyException(e));
+ StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
}
- } catch (DeadlockException e) {
+ } catch (RetryException e) {
compact(rqst);
- } finally {
- deadlockCnt = 0;
}
}
public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>());
- Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ Connection dbConn = null;
Statement stmt = null;
try {
- stmt = dbConn.createStatement();
- String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
- "cq_start, cq_run_as from COMPACTION_QUEUE";
- LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- while (rs.next()) {
- ShowCompactResponseElement e = new ShowCompactResponseElement();
- e.setDbname(rs.getString(1));
- e.setTablename(rs.getString(2));
- e.setPartitionname(rs.getString(3));
- switch (rs.getString(4).charAt(0)) {
- case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
- case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
- case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
- default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
- }
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
- case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
- }
- e.setWorkerid(rs.getString(6));
- e.setStart(rs.getLong(7));
- e.setRunAs(rs.getString(8));
- response.addToCompacts(e);
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- } catch (SQLException e) {
- LOG.debug("Going to rollback");
try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+ "cq_start, cq_run_as from COMPACTION_QUEUE";
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ ShowCompactResponseElement e = new ShowCompactResponseElement();
+ e.setDbname(rs.getString(1));
+ e.setTablename(rs.getString(2));
+ e.setPartitionname(rs.getString(3));
+ switch (rs.getString(4).charAt(0)) {
+ case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
+ case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
+ case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
+ default: throw new MetaException("Unexpected compaction state " + rs.getString(4));
+ }
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
+ case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
+ default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ e.setWorkerid(rs.getString(6));
+ e.setStart(rs.getLong(7));
+ e.setRunAs(rs.getString(8));
+ response.addToCompacts(e);
+ }
+ LOG.debug("Going to rollback");
dbConn.rollback();
- } catch (SQLException e1) {
- }
- throw new MetaException("Unable to select from transaction database " +
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "showCompact");
+ throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
- } finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ return response;
+ } catch (RetryException e) {
+ return showCompact(rqst);
}
- return response;
}
/**
@@ -828,7 +813,7 @@ public class TxnHandler {
return previous_timeout;
}
- protected class DeadlockException extends Exception {
+ protected class RetryException extends Exception {
}
@@ -839,26 +824,28 @@ public class TxnHandler {
* @return db connection
* @throws MetaException if the connection cannot be obtained
*/
- protected Connection getDbConn(int isolationLevel) throws MetaException {
+ protected Connection getDbConn(int isolationLevel) throws SQLException {
+ Connection dbConn = connPool.getConnection();
+ dbConn.setAutoCommit(false);
+ dbConn.setTransactionIsolation(isolationLevel);
+ return dbConn;
+ }
+
+ void rollbackDBConn(Connection dbConn) {
try {
- Connection dbConn = connPool.getConnection();
- dbConn.setAutoCommit(false);
- dbConn.setTransactionIsolation(isolationLevel);
- return dbConn;
+ if (dbConn != null) dbConn.rollback();
} catch (SQLException e) {
- String msg = "Unable to get jdbc connection from pool, " + e.getMessage();
- throw new MetaException(msg);
+ LOG.warn("Failed to rollback db connection " + getMessage(e));
}
}
-
protected void closeDbConn(Connection dbConn) {
try {
if (dbConn != null) dbConn.close();
} catch (SQLException e) {
- LOG.warn("Failed to close db connection " + e.getMessage());
+ LOG.warn("Failed to close db connection " + getMessage(e));
}
}
-
+
/**
* Close statement instance.
* @param stmt statement instance.
@@ -867,7 +854,7 @@ public class TxnHandler {
try {
if (stmt != null) stmt.close();
} catch (SQLException e) {
- LOG.warn("Failed to close statement " + e.getMessage());
+ LOG.warn("Failed to close statement " + getMessage(e));
}
}
@@ -882,7 +869,7 @@ public class TxnHandler {
}
}
catch(SQLException ex) {
- LOG.warn("Failed to close statement " + ex.getMessage());
+ LOG.warn("Failed to close statement " + getMessage(ex));
}
}
@@ -895,18 +882,18 @@ public class TxnHandler {
closeDbConn(dbConn);
}
/**
- * Determine if an exception was a deadlock. Unfortunately there is no standard way to do
+ * Determine if an exception was such that it makse sense to retry. Unfortunately there is no standard way to do
* this, so we have to inspect the error messages and catch the telltale signs for each
* different database.
* @param conn database connection
* @param e exception that was thrown.
* @param caller name of the method calling this
- * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock
+ * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock
* detected and retry count has not been exceeded.
*/
- protected void detectDeadlock(Connection conn,
+ protected void checkRetryable(Connection conn,
SQLException e,
- String caller) throws DeadlockException, MetaException {
+ String caller) throws RetryException, MetaException {
// If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
// to test these changes.
@@ -915,23 +902,45 @@ public class TxnHandler {
// so I've tried to capture the different error messages (there appear to be fewer different
// error messages than SQL states).
// Derby and newer MySQL driver use the new SQLTransactionRollbackException
- if (dbProduct == null) {
+ if (dbProduct == null && conn != null) {
determineDatabaseProduct(conn);
}
if (e instanceof SQLTransactionRollbackException ||
- ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
- dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
- (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
- (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
- || e.getMessage().contains("can't serialize access for this transaction")))) {
+ ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
+ dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
+ (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
+ (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
+ || e.getMessage().contains("can't serialize access for this transaction")))) {
if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
LOG.warn("Deadlock detected in " + caller + ", trying again.");
- throw new DeadlockException();
+ throw new RetryException();
} else {
LOG.error("Too many repeated deadlocks in " + caller + ", giving up.");
deadlockCnt = 0;
}
}
+ else if(isRetryable(e)) {
+ //in MSSQL this means Communication Link Failure
+ if(retryNum++ < retryLimit) {
+ try {
+ Thread.sleep(retryInterval);
+ }
+ catch(InterruptedException ex) {
+ //
+ }
+ LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e));
+ throw new RetryException();
+ }
+ else {
+ LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+ retryNum = 0;
+ }
+ }
+ else {
+ //if here, we got something that will propagate the error (rather than retry), so reset counters
+ deadlockCnt = 0;
+ retryNum = 0;
+ }
}
/**
@@ -1073,10 +1082,10 @@ public class TxnHandler {
@Override
public String toString() {
return "extLockId:" + Long.toString(extLockId) + " intLockId:" +
- intLockId + " txnId:" + Long.toString
- (txnId) + " db:" + db + " table:" + table + " partition:" +
- partition + " state:" + (state == null ? "null" : state.toString())
- + " type:" + (type == null ? "null" : type.toString());
+ intLockId + " txnId:" + Long.toString
+ (txnId) + " db:" + db + " table:" + table + " partition:" +
+ partition + " state:" + (state == null ? "null" : state.toString())
+ + " type:" + (type == null ? "null" : type.toString());
}
}
@@ -1088,11 +1097,11 @@ public class TxnHandler {
public int compare(LockInfo info1, LockInfo info2) {
// We sort by state (acquired vs waiting) and then by extLockId.
if (info1.state == LockState.ACQUIRED &&
- info2.state != LockState .ACQUIRED) {
+ info2.state != LockState .ACQUIRED) {
return -1;
}
if (info1.state != LockState.ACQUIRED &&
- info2.state == LockState .ACQUIRED) {
+ info2.state == LockState .ACQUIRED) {
return 1;
}
if (info1.extLockId < info2.extLockId) {
@@ -1124,7 +1133,7 @@ public class TxnHandler {
private void checkQFileTestHack() {
boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) ||
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
if (hackOn) {
LOG.info("Hacking in canned values for transaction manager");
// Set up the transaction/locking db in the derby metastore
@@ -1135,7 +1144,7 @@ public class TxnHandler {
// We may have already created the tables and thus don't need to redo it.
if (!e.getMessage().contains("already exists")) {
throw new RuntimeException("Unable to set up transaction database for" +
- " testing: " + e.getMessage());
+ " testing: " + e.getMessage());
}
}
}
@@ -1153,7 +1162,7 @@ public class TxnHandler {
int updateCnt = 0;
try {
stmt = dbConn.createStatement();
-
+
// delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS
StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in (");
boolean first = true;
@@ -1165,7 +1174,7 @@ public class TxnHandler {
buf.append(')');
LOG.debug("Going to execute update <" + buf.toString() + ">");
stmt.executeUpdate(buf.toString());
-
+
buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in (");
first = true;
for (Long id : txnids) {
@@ -1176,7 +1185,7 @@ public class TxnHandler {
buf.append(')');
LOG.debug("Going to execute update <" + buf.toString() + ">");
updateCnt = stmt.executeUpdate(buf.toString());
-
+
LOG.debug("Going to commit");
dbConn.commit();
} finally {
@@ -1202,7 +1211,7 @@ public class TxnHandler {
* @throws TxnAbortedException
*/
private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
- throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
+ throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
// We want to minimize the number of concurrent lock requests being issued. If we do not we
// get a large number of deadlocks in the database, since this method has to both clean
// timedout locks and insert new locks. This synchronization barrier will not eliminiate all
@@ -1227,7 +1236,7 @@ public class TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_lock_id");
+ "initialized, no record found in next_lock_id");
}
long extLockId = rs.getLong(1);
s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
@@ -1252,8 +1261,8 @@ public class TxnHandler {
s = "insert into TXN_COMPONENTS " +
"(tc_txnid, tc_database, tc_table, tc_partition) " +
"values (" + txnid + ", '" + dbName + "', " +
- (tblName == null ? "null" : "'" + tblName + "'") + ", " +
- (partName == null ? "null" : "'" + partName + "'") + ")";
+ (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+ (partName == null ? "null" : "'" + partName + "'") + ")";
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
}
@@ -1275,13 +1284,13 @@ public class TxnHandler {
long now = getDbTime(dbConn);
s = "insert into HIVE_LOCKS " +
" (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
- "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
- " values (" + extLockId + ", " +
- + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
- dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
- + ", " + (partName == null ? "null" : "'" + partName + "'") +
- ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
- rqst.getUser() + "', '" + rqst.getHostname() + "')";
+ "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
+ " values (" + extLockId + ", " +
+ + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
+ dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
+ + ", " + (partName == null ? "null" : "'" + partName + "'") +
+ ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
+ rqst.getUser() + "', '" + rqst.getHostname() + "')";
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
}
@@ -1305,7 +1314,7 @@ public class TxnHandler {
private LockResponse checkLock(Connection dbConn,
long extLockId,
boolean alwaysCommit)
- throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
+ throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);
LockResponse response = new LockResponse();
response.setLockid(extLockId);
@@ -1313,8 +1322,8 @@ public class TxnHandler {
LOG.debug("Setting savepoint");
Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
- "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type from HIVE_LOCKS where hl_db in (");
+ "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+ "hl_lock_type from HIVE_LOCKS where hl_db in (");
Set<String> strings = new HashSet<String>(locksBeingChecked.size());
for (LockInfo info : locksBeingChecked) {
@@ -1430,7 +1439,7 @@ public class TxnHandler {
// lock the whole database and we need to check it. Otherwise,
// check if they are operating on the same table, if not, move on.
if (locks[index].table != null && locks[i].table != null
- && !locks[index].table.equals(locks[i].table)) {
+ && !locks[index].table.equals(locks[i].table)) {
continue;
}
@@ -1438,30 +1447,30 @@ public class TxnHandler {
// lock the whole table and we need to check it. Otherwise,
// check if they are operating on the same partition, if not, move on.
if (locks[index].partition != null && locks[i].partition != null
- && !locks[index].partition.equals(locks[i].partition)) {
+ && !locks[index].partition.equals(locks[i].partition)) {
continue;
}
// We've found something that matches what we're trying to lock,
// so figure out if we can lock it too.
switch (jumpTable.get(locks[index].type).get(locks[i].type).get
- (locks[i].state)) {
- case ACQUIRE:
- acquire(dbConn, stmt, extLockId, info.intLockId);
- acquired = true;
- break;
- case WAIT:
- wait(dbConn, save);
- if (alwaysCommit) {
- // In the case where lockNoWait has been called we don't want to commit because
- // it's going to roll everything back. In every other case we want to commit here.
- LOG.debug("Going to commit");
- dbConn.commit();
- }
- response.setState(LockState.WAITING);
- return response;
- case KEEP_LOOKING:
- continue;
+ (locks[i].state)) {
+ case ACQUIRE:
+ acquire(dbConn, stmt, extLockId, info.intLockId);
+ acquired = true;
+ break;
+ case WAIT:
+ wait(dbConn, save);
+ if (alwaysCommit) {
+ // In the case where lockNoWait has been called we don't want to commit because
+ // it's going to roll everything back. In every other case we want to commit here.
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ }
+ response.setState(LockState.WAITING);
+ return response;
+ case KEEP_LOOKING:
+ continue;
}
if (acquired) break; // We've acquired this lock component,
// so get out of the loop and look at the next component.
@@ -1494,18 +1503,18 @@ public class TxnHandler {
}
private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
- throws SQLException, NoSuchLockException, MetaException {
+ throws SQLException, NoSuchLockException, MetaException {
long now = getDbTime(dbConn);
String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
- "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
- extLockId + " and hl_lock_int_id = " + intLockId;
+ "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+ extLockId + " and hl_lock_int_id = " + intLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new NoSuchLockException("No such lock: (" + extLockId + "," +
- + intLockId + ")");
+ + intLockId + ")");
}
// We update the database, but we don't commit because there may be other
// locks together with this, and we only want to acquire one if we can
@@ -1514,7 +1523,7 @@ public class TxnHandler {
// Heartbeats on the lock table. This commits, so do not enter it with any state
private void heartbeatLock(Connection dbConn, long extLockId)
- throws NoSuchLockException, SQLException, MetaException {
+ throws NoSuchLockException, SQLException, MetaException {
// If the lock id is 0, then there are no locks in this heartbeat
if (extLockId == 0) return;
Statement stmt = null;
@@ -1523,7 +1532,7 @@ public class TxnHandler {
long now = getDbTime(dbConn);
String s = "update HIVE_LOCKS set hl_last_heartbeat = " +
- now + " where hl_lock_ext_id = " + extLockId;
+ now + " where hl_lock_ext_id = " + extLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
@@ -1540,7 +1549,7 @@ public class TxnHandler {
// Heartbeats on the txn table. This commits, so do not enter it with any state
private void heartbeatTxn(Connection dbConn, long txnid)
- throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
+ throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException {
// If the txnid is 0, then there are no transactions in this heartbeat
if (txnid == 0) return;
Statement stmt = null;
@@ -1560,10 +1569,10 @@ public class TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new TxnAbortedException("Transaction " + txnid +
- " already aborted");
+ " already aborted");
}
s = "update TXNS set txn_last_heartbeat = " + now +
- " where txn_id = " + txnid;
+ " where txn_id = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
LOG.debug("Going to commit");
@@ -1575,17 +1584,17 @@ public class TxnHandler {
// NEVER call this function without first calling heartbeat(long, long)
private long getTxnIdFromLockId(Connection dbConn, long extLockId)
- throws NoSuchLockException, MetaException, SQLException {
+ throws NoSuchLockException, MetaException, SQLException {
Statement stmt = null;
try {
stmt = dbConn.createStatement();
String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
- extLockId;
+ extLockId;
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("This should never happen! We already " +
- "checked the lock existed but now we can't find it!");
+ "checked the lock existed but now we can't find it!");
}
long txnid = rs.getLong(1);
LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid));
@@ -1597,13 +1606,13 @@ public class TxnHandler {
// NEVER call this function without first calling heartbeat(long, long)
private List<LockInfo> getLockInfoFromLockId(Connection dbConn, long extLockId)
- throws NoSuchLockException, MetaException, SQLException {
+ throws NoSuchLockException, MetaException, SQLException {
Statement stmt = null;
try {
stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " +
- "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
- "hl_lock_ext_id = " + extLockId;
+ "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " +
+ "hl_lock_ext_id = " + extLockId;
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
boolean sawAtLeastOne = false;
@@ -1614,7 +1623,7 @@ public class TxnHandler {
}
if (!sawAtLeastOne) {
throw new MetaException("This should never happen! We already " +
- "checked the lock existed but now we can't find it!");
+ "checked the lock existed but now we can't find it!");
}
return ourLockInfo;
} finally {
@@ -1632,7 +1641,7 @@ public class TxnHandler {
stmt = dbConn.createStatement();
// Remove any timed out locks from the table.
String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
- (now - timeout);
+ (now - timeout);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
LOG.debug("Going to commit");
@@ -1652,7 +1661,7 @@ public class TxnHandler {
stmt = dbConn.createStatement();
// Abort any timed out locks from the table.
String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +
- "' and txn_last_heartbeat < " + (now - timeout);
+ "' and txn_last_heartbeat < " + (now - timeout);
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
List<Long> deadTxns = new ArrayList<Long>();
@@ -1675,12 +1684,12 @@ public class TxnHandler {
String passwd;
try {
passwd = ShimLoader.getHadoopShims().getPassword(conf,
- HiveConf.ConfVars.METASTOREPWD.varname);
+ HiveConf.ConfVars.METASTOREPWD.varname);
} catch (IOException err) {
throw new SQLException("Error getting metastore password", err);
}
String connectionPooler = HiveConf.getVar(conf,
- HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
+ HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase();
if ("bonecp".equals(connectionPooler)) {
BoneCPConfig config = new BoneCPConfig();
@@ -1696,22 +1705,22 @@ public class TxnHandler {
// This doesn't get used, but it's still necessary, see
// http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup
PoolableConnectionFactory poolConnFactory =
- new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
+ new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true);
connPool = new PoolingDataSource(objectPool);
} else {
throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler);
}
}
- private static synchronized void buildJumpTable() {
+ private static synchronized void buildJumpTable() {
if (jumpTable != null) return;
jumpTable =
- new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3);
+ new HashMap<LockType, Map<LockType, Map<LockState, LockAction>>>(3);
// SR: Lock we are trying to acquire is shared read
Map<LockType, Map<LockState, LockAction>> m =
- new HashMap<LockType, Map<LockState, LockAction>>(3);
+ new HashMap<LockType, Map<LockState, LockAction>>(3);
jumpTable.put(LockType.SHARED_READ, m);
// SR.SR: Lock we are examining is shared read
@@ -1743,7 +1752,7 @@ public class TxnHandler {
// that something is blocking it that would not block a read.
m2.put(LockState.WAITING, LockAction.KEEP_LOOKING);
- // SR.E: Lock we are examining is exclusive
+ // SR.E: Lock we are examining is exclusive
m2 = new HashMap<LockState, LockAction>(2);
m.put(LockType.EXCLUSIVE, m2);
@@ -1777,7 +1786,7 @@ public class TxnHandler {
m2.put(LockState.ACQUIRED, LockAction.WAIT);
m2.put(LockState.WAITING, LockAction.WAIT);
- // SW.E: Lock we are examining is exclusive
+ // SW.E: Lock we are examining is exclusive
m2 = new HashMap<LockState, LockAction>(2);
m.put(LockType.EXCLUSIVE, m2);
@@ -1805,7 +1814,7 @@ public class TxnHandler {
m2.put(LockState.ACQUIRED, LockAction.WAIT);
m2.put(LockState.WAITING, LockAction.WAIT);
- // E.E: Lock we are examining is exclusive
+ // E.E: Lock we are examining is exclusive
m2 = new HashMap<LockState, LockAction>(2);
m.put(LockType.EXCLUSIVE, m2);
@@ -1813,4 +1822,20 @@ public class TxnHandler {
m2.put(LockState.ACQUIRED, LockAction.WAIT);
m2.put(LockState.WAITING, LockAction.WAIT);
}
+ /**
+ * Returns true if {@code ex} should be retried
+ */
+ private static boolean isRetryable(Exception ex) {
+ if(ex instanceof SQLException) {
+ SQLException sqlException = (SQLException)ex;
+ if("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
+ //in MSSQL this means Communication Link Failure
+ return true;
+ }
+ }
+ return false;
+ }
+ private static String getMessage(SQLException ex) {
+ return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
+ }
}
Modified: hive/branches/llap/metastore/src/model/package.jdo
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/model/package.jdo?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/model/package.jdo (original)
+++ hive/branches/llap/metastore/src/model/package.jdo Fri Jan 23 19:59:11 2015
@@ -985,6 +985,41 @@
</field>
</class>
+ <class name="MNotificationLog" table="NOTIFICATION_LOG" identity-type="datastore" detachable="true">
+ <datastore-identity>
+ <column name="NL_ID"/>
+ </datastore-identity>
+ <field name="eventId">
+ <column name="EVENT_ID" jdbc-type="BIGINT" allows-null="false"/>
+ </field>
+ <field name="eventTime">
+ <column name="EVENT_TIME" jdbc-type="INTEGER" allows-null="false"/>
+ </field>
+ <field name="eventType">
+ <column name="EVENT_TYPE" length="32" jdbc-type="VARCHAR" allows-null="false"/>
+ </field>
+ <field name="dbName">
+ <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
+ </field>
+ <field name="tableName">
+ <column name="TBL_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/>
+ </field>
+ <field name="message">
+ <column name="MESSAGE" jdbc-type="LONGVARCHAR"/>
+ </field>
+ </class>
+
+ <!-- I tried to use a sequence here but derby didn't handle it well. -->
+ <class name="MNotificationNextId" table="NOTIFICATION_SEQUENCE" identity-type="datastore" detachable="true">
+ <datastore-identity>
+ <column name="NNI_ID"/>
+ </datastore-identity>
+ <field name="nextEventId">
+ <column name="NEXT_EVENT_ID" jdbc-type="BIGINT" allows-null="false"/>
+ </field>
+ </class>
+
+
</package>
</jdo>
Modified: hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java (original)
+++ hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java Fri Jan 23 19:59:11 2015
@@ -21,12 +21,14 @@ package org.apache.hadoop.hive.metastore
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
@@ -36,6 +38,9 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
@@ -349,7 +354,7 @@ public class DummyRawStoreControlledComm
public boolean grantRole(Role role, String userName, PrincipalType principalType,
String grantor, PrincipalType grantorType, boolean grantOption)
throws MetaException, NoSuchObjectException, InvalidObjectException {
- return objectStore.grantRole(role, userName, principalType, grantor, grantorType,
+ return objectStore.grantRole(role, userName, principalType, grantor, grantorType,
grantOption);
}
@@ -403,13 +408,13 @@ public class DummyRawStoreControlledComm
@Override
public List<MDBPrivilege> listPrincipalDBGrants(String principalName,
PrincipalType principalType, String dbName) {
- return objectStore.listPrincipalDBGrants(principalName, principalType, dbName);
+ return objectStore.listPrincipalDBGrants(principalName, principalType, dbName);
}
@Override
public List<MTablePrivilege> listAllTableGrants(String principalName,
PrincipalType principalType, String dbName, String tableName) {
- return objectStore.listAllTableGrants(principalName, principalType,
+ return objectStore.listAllTableGrants(principalName, principalType,
dbName, tableName);
}
@@ -726,4 +731,25 @@ public class DummyRawStoreControlledComm
return null;
}
+ @Override
+ public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
+ return objectStore.getNextNotification(rqst);
+ }
+
+ @Override
+ public void addNotificationEvent(NotificationEvent event) {
+ objectStore.addNotificationEvent(event);
+ }
+
+ @Override
+ public void cleanNotificationEvents(int olderThan) {
+ objectStore.cleanNotificationEvents(olderThan);
+ }
+
+ @Override
+ public CurrentNotificationEventId getCurrentNotificationEventId() {
+ return objectStore.getCurrentNotificationEventId();
+ }
+
+
}
Modified: hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java (original)
+++ hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java Fri Jan 23 19:59:11 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore
import java.util.List;
import java.util.Map;
+import java.util.SortedSet;
import junit.framework.Assert;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
@@ -37,6 +39,9 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
@@ -742,7 +747,28 @@ public class DummyRawStoreForJdoConnecti
throws MetaException {
return null;
}
-
+
+ @Override
+ public NotificationEventResponse getNextNotification(NotificationEventRequest rqst) {
+ return null;
+ }
+
+ @Override
+ public void addNotificationEvent(NotificationEvent event) {
+
+ }
+
+ @Override
+ public void cleanNotificationEvents(int olderThan) {
+
+ }
+
+ @Override
+ public CurrentNotificationEventId getCurrentNotificationEventId() {
+ return null;
+ }
+
+
}
Modified: hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (original)
+++ hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Fri Jan 23 19:59:11 2015
@@ -1124,11 +1124,11 @@ public class TestTxnHandler {
LOG.debug("no exception, no deadlock");
} catch (SQLException e) {
try {
- txnHandler.detectDeadlock(conn1, e, "thread t1");
+ txnHandler.checkRetryable(conn1, e, "thread t1");
LOG.debug("Got an exception, but not a deadlock, SQLState is " +
e.getSQLState() + " class of exception is " + e.getClass().getName() +
" msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.DeadlockException de) {
+ } catch (TxnHandler.RetryException de) {
LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
"exception is " + e.getClass().getName() + " msg is <" + e
.getMessage() + ">");
@@ -1154,11 +1154,11 @@ public class TestTxnHandler {
LOG.debug("no exception, no deadlock");
} catch (SQLException e) {
try {
- txnHandler.detectDeadlock(conn2, e, "thread t2");
+ txnHandler.checkRetryable(conn2, e, "thread t2");
LOG.debug("Got an exception, but not a deadlock, SQLState is " +
e.getSQLState() + " class of exception is " + e.getClass().getName() +
" msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.DeadlockException de) {
+ } catch (TxnHandler.RetryException de) {
LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
"exception is " + e.getClass().getName() + " msg is <" + e
.getMessage() + ">");
Modified: hive/branches/llap/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/pom.xml?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/pom.xml (original)
+++ hive/branches/llap/pom.xml Fri Jan 23 19:59:11 2015
@@ -49,6 +49,7 @@
<module>serde</module>
<module>service</module>
<module>shims</module>
+ <module>spark-client</module>
<module>testutils</module>
<module>packaging</module>
</modules>
@@ -117,10 +118,10 @@
<commons-pool.version>1.5.4</commons-pool.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<derby.version>10.11.1.1</derby.version>
- <guava.version>11.0.2</guava.version>
+ <guava.version>14.0.1</guava.version>
<groovy.version>2.1.6</groovy.version>
<hadoop-20S.version>1.2.1</hadoop-20S.version>
- <hadoop-23.version>2.5.0</hadoop-23.version>
+ <hadoop-23.version>2.6.0</hadoop-23.version>
<hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
<hbase.hadoop1.version>0.98.3-hadoop1</hbase.hadoop1.version>
<hbase.hadoop2.version>0.98.3-hadoop2</hbase.hadoop2.version>
@@ -146,11 +147,7 @@
<opencsv.version>2.3</opencsv.version>
<mockito-all.version>1.9.5</mockito-all.version>
<mina.version>2.0.0-M5</mina.version>
- <!--netty is not a direct dependency but due to a change
- in artifact name and given that zookeeper < 3.5
- requires netty < 3.6.0 we force hadoops version
- -->
- <netty.version>3.4.0.Final</netty.version>
+ <netty.version>4.0.23.Final</netty.version>
<parquet.version>1.6.0rc3</parquet.version>
<pig.version>0.12.0</pig.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -159,15 +156,19 @@
<ST4.version>4.0.4</ST4.version>
<tez.version>0.5.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
+ <spark.version>1.2.0</spark.version>
+ <scala.binary.version>2.10</scala.binary.version>
+ <scala.version>2.10.4</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
<snappy.version>0.2</snappy.version>
<wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
<velocity.version>1.5</velocity.version>
<xerces.version>2.9.1</xerces.version>
- <zookeeper.version>3.4.5</zookeeper.version>
+ <zookeeper.version>3.4.6</zookeeper.version>
<jpam.version>1.1</jpam.version>
<felix.version>2.4.0</felix.version>
<curator.version>2.6.0</curator.version>
+ <jsr305.version>3.0.0</jsr305.version>
</properties>
<repositories>
@@ -203,7 +204,7 @@
<snapshots>
<enabled>false</enabled>
</snapshots>
- </repository>
+ </repository>
<repository>
<id>sonatype-snapshot</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
@@ -213,7 +214,7 @@
<snapshots>
<enabled>false</enabled>
</snapshots>
- </repository>
+ </repository>
</repositories>
<!-- Hadoop dependency management is done at the bottom under profiles -->
@@ -318,7 +319,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
@@ -352,9 +353,9 @@
<version>${log4j.version}</version>
</dependency>
<dependency>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- <version>${log4j-extras.version}</version>
+ <groupId>log4j</groupId>
+ <artifactId>apache-log4j-extras</artifactId>
+ <version>${log4j-extras.version}</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
@@ -824,6 +825,7 @@
<LANG>en_US.UTF-8</LANG>
<HADOOP_CLASSPATH>${test.tmp.dir}/conf:${basedir}/${hive.path.to.root}/conf</HADOOP_CLASSPATH>
<HIVE_HADOOP_TEST_CLASSPATH>${test.hive.hadoop.classpath}</HIVE_HADOOP_TEST_CLASSPATH>
+ <SPARK_SUBMIT_CLASSPATH>${spark.home}/lib/spark-assembly-${spark.version}-hadoop2.4.0.jar:${test.hive.hadoop.classpath}</SPARK_SUBMIT_CLASSPATH>
<PATH>${env.PATH}${test.extra.path}</PATH>
</environmentVariables>
<systemPropertyVariables>
@@ -845,6 +847,7 @@
<log4j.debug>true</log4j.debug>
<!-- don't diry up /tmp -->
<java.io.tmpdir>${test.tmp.dir}</java.io.tmpdir>
+ <spark.home>${spark.home}</spark.home>
<!-- Hadoop's minidfs class uses this -->
<test.build.data>${test.tmp.dir}</test.build.data>
<!-- required by QTestUtil -->
@@ -1114,6 +1117,22 @@
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop-23.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>
Modified: hive/branches/llap/ql/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/pom.xml?rev=1654355&r1=1654354&r2=1654355&view=diff
==============================================================================
--- hive/branches/llap/ql/pom.xml (original)
+++ hive/branches/llap/ql/pom.xml Fri Jan 23 19:59:11 2015
@@ -65,6 +65,11 @@
<artifactId>hive-shims</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>spark-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- inter-project -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
@@ -140,6 +145,12 @@
<artifactId>avro-mapred</artifactId>
<classifier>hadoop2</classifier>
<version>${avro.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.ant</groupId>
@@ -167,6 +178,23 @@
<version>${zookeeper.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>apache-curator</artifactId>
+ <version>${curator.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>
@@ -317,6 +345,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -415,6 +447,12 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
<profiles>
@@ -657,6 +695,7 @@
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<include>com.google.guava:guava</include>
<include>net.sf.opencsv:opencsv</include>
+ <include>org.apache.hive:spark-client</include>
</includes>
</artifactSet>
<relocations>