You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/12/14 20:48:22 UTC
hive git commit: HIVE-12620 Misc improvement to Acid module(Eugene
Koifman, reviewed by Wei Zheng, Jason Dere)
Repository: hive
Updated Branches:
refs/heads/master 69f03b86c -> 49dc6452a
HIVE-12620 Misc improvement to Acid module(Eugene Koifman, reviewed by Wei Zheng, Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/49dc6452
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/49dc6452
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/49dc6452
Branch: refs/heads/master
Commit: 49dc6452acf022e463318b0042f7ad918927bc5e
Parents: 69f03b8
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Dec 14 11:46:06 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Dec 14 11:46:20 2015 -0800
----------------------------------------------------------------------
.../hadoop/hive/metastore/txn/TxnHandler.java | 60 ++++++++++++++++----
.../hadoop/hive/ql/lockmgr/DbLockManager.java | 47 ++++++++++++++-
.../hive/ql/lockmgr/TestDbTxnManager.java | 41 ++++++++++---
3 files changed, 126 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 8ff2195..50d8892 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -529,8 +529,7 @@ public class TxnHandler {
else {
heartbeatLock(dbConn, extLockId);
}
- closeDbConn(dbConn);
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
return checkLock(dbConn, extLockId);
} catch (SQLException e) {
LOG.debug("Going to rollback");
@@ -1099,6 +1098,10 @@ public class TxnHandler {
LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
}
}
+ else {
+ //make sure we know we saw an error that we don't recognize
+ LOG.info("Non-retryable error: " + getMessage(e));
+ }
}
finally {
/*if this method ends with anything except a retry signal, the caller should fail the operation
@@ -1577,7 +1580,7 @@ public class TxnHandler {
return checkLock(dbConn, extLockId);
} catch (NoSuchLockException e) {
// This should never happen, as we just added the lock id
- throw new MetaException("Couldn't find a lock we just created!");
+ throw new MetaException("Couldn't find a lock we just created! " + e.getMessage());
} finally {
close(rs);
closeStmt(stmt);
@@ -1706,7 +1709,7 @@ public class TxnHandler {
if (index == -1) {
LOG.debug("Going to rollback");
dbConn.rollback();
- throw new MetaException("How did we get here, we heartbeated our lock before we started!");
+ throw new MetaException("How did we get here, we heartbeated our lock before we started! ( " + info + ")");
}
@@ -1972,17 +1975,50 @@ public class TxnHandler {
// open transactions.
private void timeOutLocks(Connection dbConn, long now) {
Statement stmt = null;
+ ResultSet rs = null;
try {
stmt = dbConn.createStatement();
- // Remove any timed out locks from the table.
- String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
- (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
+ long maxHeartbeatTime = now - timeout;
+ //doing a SELECT first is less efficient but makes it easier to debug things
+ String s = "select distinct hl_lock_ext_id from HIVE_LOCKS where hl_last_heartbeat < " +
+ maxHeartbeatTime + " and hl_txnid = 0";//when txnid is <> 0, the lock is
//associated with a txn and is handled by performTimeOuts()
//want to avoid expiring locks for a txn w/o expiring the txn itself
- LOG.debug("Going to execute update <" + s + ">");
- int deletedLocks = stmt.executeUpdate(s);
+ List<Long> extLockIDs = new ArrayList<>();
+ rs = stmt.executeQuery(s);
+ while(rs.next()) {
+ extLockIDs.add(rs.getLong(1));
+ }
+ rs.close();
+ dbConn.commit();
+ if(extLockIDs.size() <= 0) {
+ return;
+ }
+ int deletedLocks = 0;
+ //include same hl_last_heartbeat condition in case someone heartbeated since the select
+ s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + maxHeartbeatTime + " and hl_txnid = 0" +
+ " and hl_lock_ext_id IN (";
+ int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE;
+ for(int i = 0; i < numWholeBatches; i++) {
+ StringBuilder sb = new StringBuilder(s);
+ for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j++) {
+ sb.append(extLockIDs.get(j)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');
+ LOG.debug("Removing expired locks via: " + sb.toString());
+ deletedLocks += stmt.executeUpdate(sb.toString());
+ dbConn.commit();
+ }
+ StringBuilder sb = new StringBuilder(s);
+ for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < extLockIDs.size(); i++) {
+ sb.append(extLockIDs.get(i)).append(",");
+ }
+ sb.setCharAt(sb.length() - 1, ')');
+ LOG.debug("Removing expired locks via: " + sb.toString());
+ deletedLocks += stmt.executeUpdate(sb.toString());
if(deletedLocks > 0) {
- LOG.info("Deleted " + deletedLocks + " locks from HIVE_LOCKS due to timeout");
+ LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs. " +
+ extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime);
}
LOG.debug("Going to commit");
dbConn.commit();
@@ -1993,6 +2029,7 @@ public class TxnHandler {
catch(Exception ex) {
LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex);
} finally {
+ close(rs);
closeStmt(stmt);
}
}
@@ -2265,7 +2302,8 @@ public class TxnHandler {
//in MSSQL this means Communication Link Failure
return true;
}
- if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState())) {
+ if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
+ sqlException.getMessage().contains("consistent read failure; rollback data not available")) {
return true;
}
//see also https://issues.apache.org/jira/browse/HIVE-9938
http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 7d58622..7fa57d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -52,6 +52,7 @@ public class DbLockManager implements HiveLockManager{
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private long MAX_SLEEP;
+ //longer term we should always have a txn id and then we won't need to track locks here at all
private Set<DbHiveLock> locks;
private IMetaStoreClient client;
private long nextSleep = 50;
@@ -115,7 +116,27 @@ public class DbLockManager implements HiveLockManager{
}
long retryDuration = System.currentTimeMillis() - startRetry;
- DbHiveLock hl = new DbHiveLock(res.getLockid());
+ DbHiveLock hl = new DbHiveLock(res.getLockid(), queryId, lock.getTxnid());
+ if(locks.size() > 0) {
+ boolean logMsg = false;
+ for(DbHiveLock l : locks) {
+ if(l.txnId != hl.txnId) {
+ //locks from different transactions detected (or from transaction and read-only query in autocommit)
+ logMsg = true;
+ break;
+ }
+ else if(l.txnId == 0) {
+ if(!l.queryId.equals(hl.queryId)) {
+ //here means no open transaction, but different queries
+ logMsg = true;
+ break;
+ }
+ }
+ }
+ if(logMsg) {
+ LOG.warn("adding new DbHiveLock(" + hl + ") while we are already tracking locks: " + locks);
+ }
+ }
locks.add(hl);
if (res.getState() != LockState.ACQUIRED) {
if(res.getState() == LockState.WAITING) {
@@ -191,10 +212,12 @@ public class DbLockManager implements HiveLockManager{
@Override
public void unlock(HiveLock hiveLock) throws LockException {
long lockId = ((DbHiveLock)hiveLock).lockId;
+ boolean removed = false;
try {
LOG.debug("Unlocking " + hiveLock);
client.unlock(lockId);
- boolean removed = locks.remove(hiveLock);
+ //important to remove after unlock() in case it fails
+ removed = locks.remove(hiveLock);
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
try {
@@ -205,6 +228,9 @@ public class DbLockManager implements HiveLockManager{
}
LOG.debug("Removed a lock " + removed);
} catch (NoSuchLockException e) {
+ //if metastore has no record of this lock, it most likely timed out; either way
+ //there is no point tracking it here any longer
+ removed = locks.remove(hiveLock);
LOG.error("Metastore could find no record of lock " + JavaUtils.lockIdToString(lockId));
throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
} catch (TxnOpenException e) {
@@ -214,10 +240,16 @@ public class DbLockManager implements HiveLockManager{
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
}
+ finally {
+ if(removed) {
+ LOG.debug("Removed a lock " + hiveLock);
+ }
+ }
}
@Override
public void releaseLocks(List<HiveLock> hiveLocks) {
+ LOG.info("releaseLocks: " + hiveLocks);
for (HiveLock lock : hiveLocks) {
try {
unlock(lock);
@@ -225,6 +257,8 @@ public class DbLockManager implements HiveLockManager{
// Not sure why this method doesn't throw any exceptions,
// but since the interface doesn't allow it we'll just swallow them and
// move on.
+ //This OK-ish since releaseLocks() is only called for RO/AC queries; it
+ //would be really bad to eat exceptions here for write operations
}
}
}
@@ -271,10 +305,17 @@ public class DbLockManager implements HiveLockManager{
static class DbHiveLock extends HiveLock {
long lockId;
+ String queryId;
+ long txnId;
DbHiveLock(long id) {
lockId = id;
}
+ DbHiveLock(long id, String queryId, long txnId) {
+ lockId = id;
+ this.queryId = queryId;
+ this.txnId = txnId;
+ }
@Override
public HiveLockObject getHiveLockObject() {
@@ -301,7 +342,7 @@ public class DbLockManager implements HiveLockManager{
}
@Override
public String toString() {
- return JavaUtils.lockIdToString(lockId);
+ return JavaUtils.lockIdToString(lockId) + " queryId=" + queryId + " " + JavaUtils.txnIdToString(txnId);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index b7d1d18..f82b85a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -198,9 +199,8 @@ public class TestDbTxnManager {
}
@Test
public void testExceptions() throws Exception {
- WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
+ addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.acquireLocks(qp, ctx, "PeterI");
txnMgr.openTxn("NicholasII");
runReaper();
LockException exception = null;
@@ -240,6 +240,32 @@ public class TestDbTxnManager {
}
@Test
+ public void testLockTimeout() throws Exception {
+ addPartitionInput(newTable(true));
+ QueryPlan qp = new MockQueryPlan(this);
+ //make sure it works with nothing to expire
+ expireLocks(txnMgr, 0);
+ //create a few read locks, all on the same resource
+ for(int i = 0; i < 5; i++) {
+ txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+ }
+ expireLocks(txnMgr, 5);
+ //create a lot of locks
+ for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+ txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+ }
+ expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+ }
+ private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
+ DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
+ ShowLocksResponse resp = lockManager.getLocks();
+ Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
+ runReaper();
+ resp = lockManager.getLocks();
+ Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+ }
+
+ @Test
public void testReadWrite() throws Exception {
Table t = newTable(true);
addPartitionInput(t);
@@ -359,6 +385,7 @@ public class TestDbTxnManager {
public void setUp() throws Exception {
TxnDbUtil.prepDb();
txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.getLockManager();//init lock manager
Assert.assertTrue(txnMgr instanceof DbTxnManager);
nextInput = 1;
readEntities = new HashSet<ReadEntity>();
@@ -376,15 +403,13 @@ public class TestDbTxnManager {
}
private static class MockQueryPlan extends QueryPlan {
- private final HashSet<ReadEntity> inputs;
- private final HashSet<WriteEntity> outputs;
+ private final HashSet<ReadEntity> inputs = new HashSet<>();
+ private final HashSet<WriteEntity> outputs = new HashSet<>();
private final String queryId;
MockQueryPlan(TestDbTxnManager test) {
- HashSet<ReadEntity> r = test.readEntities;
- HashSet<WriteEntity> w = test.writeEntities;
- inputs = (r == null) ? new HashSet<ReadEntity>() : r;
- outputs = (w == null) ? new HashSet<WriteEntity>() : w;
+ inputs.addAll(test.readEntities);
+ outputs.addAll(test.writeEntities);
queryId = makeQueryId();
}