You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/11/18 02:03:31 UTC
[2/3] hive git commit: HIVE-11948 Investigate TxnHandler and
CompactionTxnHandler to see where we improve concurrency(Eugene Koifman,
reviewed by Alan Gates)
HIVE-11948 Investigate TxnHandler and CompactionTxnHandler to see where we improve concurrency(Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a80841b7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a80841b7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a80841b7
Branch: refs/heads/branch-1
Commit: a80841b73431de470cd0e53fcee3d04ca85ef7f5
Parents: b1c1bf2
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue Nov 17 16:43:42 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue Nov 17 16:43:42 2015 -0800
----------------------------------------------------------------------
.../hive/hcatalog/streaming/TestStreaming.java | 6 +-
.../metastore/txn/CompactionTxnHandler.java | 125 +++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 523 ++++++++++++++-----
.../metastore/txn/TestCompactionTxnHandler.java | 37 --
.../hive/metastore/txn/TestTxnHandler.java | 10 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 2 +-
6 files changed, 463 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 58cfbaa..806dbdb 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -618,7 +618,7 @@ public class TestStreaming {
}
@Test
- public void testHearbeat() throws Exception {
+ public void testHeartbeat() throws Exception {
HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null);
DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt);
StreamingConnection connection = endPt.newConnection(false, null);
@@ -632,14 +632,14 @@ public class TestStreaming {
Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size());
ShowLocksResponseElement lock = response.getLocks().get(0);
long acquiredAt = lock.getAcquiredat();
- long heartbeatAt = lock.getAcquiredat();
+ long heartbeatAt = lock.getLastheartbeat();
txnBatch.heartbeat();
response = msClient.showLocks();
Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size());
lock = response.getLocks().get(0);
Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat());
Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() +
- ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt);
+ ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt);
}
@Test
public void testTransactionBatchEmptyAbort() throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 44ee5c6..eab801a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -56,6 +56,7 @@ public class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Set<CompactionInfo> response = new HashSet<CompactionInfo>();
Statement stmt = null;
+ ResultSet rs = null;
try {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -64,7 +65,7 @@ public class CompactionTxnHandler extends TxnHandler {
String s = "select distinct ctc_database, ctc_table, " +
"ctc_partition from COMPLETED_TXN_COMPONENTS";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
while (rs.next()) {
CompactionInfo info = new CompactionInfo();
info.dbname = rs.getString(1);
@@ -72,6 +73,7 @@ public class CompactionTxnHandler extends TxnHandler {
info.partName = rs.getString(3);
response.add(info);
}
+ rs.close();
// Check for aborted txns
s = "select tc_database, tc_table, tc_partition " +
@@ -97,8 +99,7 @@ public class CompactionTxnHandler extends TxnHandler {
LOG.error("Unable to connect to transaction database " + e.getMessage());
checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
} finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
}
return response;
}
@@ -118,7 +119,7 @@ public class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
LOG.debug("Going to execute update <" + s + ">");
@@ -153,46 +154,58 @@ public class CompactionTxnHandler extends TxnHandler {
public CompactionInfo findNextToCompact(String workerId) throws MetaException {
try {
Connection dbConn = null;
- CompactionInfo info = new CompactionInfo();
-
Statement stmt = null;
+ ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select cq_id, cq_database, cq_table, cq_partition, " +
"cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
LOG.debug("No compactions found ready to compact");
dbConn.rollback();
return null;
}
- info.id = rs.getLong(1);
- info.dbname = rs.getString(2);
- info.tableName = rs.getString(3);
- info.partName = rs.getString(4);
- switch (rs.getString(5).charAt(0)) {
- case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
- case MINOR_TYPE: info.type = CompactionType.MINOR; break;
- default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
- }
-
- // Now, update this record as being worked on by this worker.
- long now = getDbTime(dbConn);
- s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
- "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
- LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
- if (updCount != 1) {
+ do {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE:
+ info.type = CompactionType.MAJOR;
+ break;
+ case MINOR_TYPE:
+ info.type = CompactionType.MINOR;
+ break;
+ default:
+ throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ // Now, update this record as being worked on by this worker.
+ long now = getDbTime(dbConn);
+ s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+ "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+ " AND cq_state='" + INITIATED_STATE + "'";
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCount = stmt.executeUpdate(s);
+ if(updCount == 1) {
+ dbConn.commit();
+ return info;
+ }
+ if(updCount == 0) {
+ LOG.debug("Another Worker picked up " + info);
+ continue;
+ }
LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
- info + ". updCnt=" + updCount);
- LOG.debug("Going to rollback");
+ info + ". updCnt=" + updCount + ".");
dbConn.rollback();
- }
- LOG.debug("Going to commit");
- dbConn.commit();
- return info;
+ return null;
+ } while( rs.next());
+ dbConn.rollback();
+ return null;
} catch (SQLException e) {
LOG.error("Unable to select next element for compaction, " + e.getMessage());
LOG.debug("Going to rollback");
@@ -201,8 +214,7 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
return findNextToCompact(workerId);
@@ -219,7 +231,7 @@ public class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
"cq_worker_id = null where cq_id = " + info.id;
@@ -240,8 +252,8 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
closeStmt(stmt);
+ closeDbConn(dbConn);
}
} catch (RetryException e) {
markCompacted(info);
@@ -258,6 +270,7 @@ public class CompactionTxnHandler extends TxnHandler {
List<CompactionInfo> rc = new ArrayList<CompactionInfo>();
Statement stmt = null;
+ ResultSet rs = null;
try {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -265,7 +278,7 @@ public class CompactionTxnHandler extends TxnHandler {
String s = "select cq_id, cq_database, cq_table, cq_partition, " +
"cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
while (rs.next()) {
CompactionInfo info = new CompactionInfo();
info.id = rs.getLong(1);
@@ -291,8 +304,7 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
return findReadyToClean();
@@ -303,23 +315,28 @@ public class CompactionTxnHandler extends TxnHandler {
* This will remove an entry from the queue after
* it has been compacted.
*
- * todo: possibly a problem? Worker will start with DB in state X (wrt this partition).
+ * todo: Worker will start with DB in state X (wrt this partition).
* while it's working more txns will happen, against partition it's compacting.
* then this will delete state up to X and since then. There may be new delta files created
* between compaction starting and cleaning. These will not be compacted until more
* transactions happen. So this ideally should only delete
* up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run
- * at READ_COMMITTED
+ * at READ_COMMITTED. So this means we'd want to store HWM in COMPACTION_QUEUE when
+ * Worker picks up the job.
*
* Also, by using this method when Worker fails, we prevent future compactions from
- * running until more data is written to tale or compaction is invoked explicitly
+ * running until more data is written to table or compaction is invoked explicitly
* @param info info on the compaction entry to remove
*/
public void markCleaned(CompactionInfo info) throws MetaException {
try {
Connection dbConn = null;
Statement stmt = null;
+ ResultSet rs = null;
try {
+ //do we need serializable? Once we have the HWM as above, no. Before that
+ //it's debatable, but problem described above applies either way
+ //Thus can drop to RC
dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
@@ -344,19 +361,20 @@ public class CompactionTxnHandler extends TxnHandler {
"marking compaction entry as clean!");
}
-
+ //todo: add distinct in query
s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
info.tableName + "'";
if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
LOG.debug("Going to execute update <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
Set<Long> txnids = new HashSet<Long>();
while (rs.next()) txnids.add(rs.getLong(1));
if (txnids.size() > 0) {
// Remove entries from txn_components, as there may be aborted txn components
StringBuilder buf = new StringBuilder();
+ //todo: add a safeguard to make sure IN clause is not too large; break up by txn id
buf.append("delete from TXN_COMPONENTS where tc_txnid in (");
boolean first = true;
for (long id : txnids) {
@@ -394,8 +412,7 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
markCleaned(info);
@@ -409,14 +426,17 @@ public class CompactionTxnHandler extends TxnHandler {
try {
Connection dbConn = null;
Statement stmt = null;
+ ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ //Aborted is a terminal state, so nothing about the txn can change
+ //after that, so READ COMMITTED is sufficient.
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select txn_id from TXNS where " +
"txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
"txn_state = '" + TXN_ABORTED + "'";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
Set<Long> txnids = new HashSet<Long>();
while (rs.next()) txnids.add(rs.getLong(1));
if (txnids.size() > 0) {
@@ -443,8 +463,7 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
- closeStmt(stmt);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
cleanEmptyAbortedTxns();
@@ -465,7 +484,7 @@ public class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+ INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
@@ -485,8 +504,8 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
closeStmt(stmt);
+ closeDbConn(dbConn);
}
} catch (RetryException e) {
revokeFromLocalWorkers(hostname);
@@ -507,7 +526,7 @@ public class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
long latestValidStart = getDbTime(dbConn) - timeout;
stmt = dbConn.createStatement();
String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
@@ -528,8 +547,8 @@ public class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeDbConn(dbConn);
closeStmt(stmt);
+ closeDbConn(dbConn);
}
} catch (RetryException e) {
revokeTimedoutWorkers(timeout);
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 0b3d565..15b747d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -52,6 +52,11 @@ import java.util.concurrent.TimeUnit;
* and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
* The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
* so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
*/
public class TxnHandler {
// Compactor states
@@ -150,12 +155,20 @@ public class TxnHandler {
// subsequently shows up in the open list that's ok.
Connection dbConn = null;
Statement stmt = null;
+ ResultSet rs = null;
try {
+ /**
+ * This method can run at READ_COMMITTED as long as long as
+ * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
+ * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
+ * adding corresponding entries into TXNS. The reason is that any txnid below HWM
+ * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+ */
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select ntxn_next - 1 from NEXT_TXN_ID";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
@@ -165,7 +178,7 @@ public class TxnHandler {
throw new MetaException("Transaction tables not properly " +
"initialized, null record found in next_txn_id");
}
-
+ close(rs);
List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
//need the WHERE clause below to ensure consistent results with READ_COMMITTED
s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm;
@@ -199,8 +212,7 @@ public class TxnHandler {
throw new MetaException("Unable to select from transaction database: " + getMessage(e)
+ StringUtils.stringifyException(e));
} finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
return getOpenTxnsInfo();
@@ -215,12 +227,16 @@ public class TxnHandler {
// subsequently shows up in the open list that's ok.
Connection dbConn = null;
Statement stmt = null;
+ ResultSet rs = null;
try {
+ /**
+ * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+\ */
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
String s = "select ntxn_next - 1 from NEXT_TXN_ID";
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction tables not properly " +
"initialized, no record found in next_txn_id");
@@ -230,7 +246,7 @@ public class TxnHandler {
throw new MetaException("Transaction tables not properly " +
"initialized, null record found in next_txn_id");
}
-
+ close(rs);
Set<Long> openList = new HashSet<Long>();
//need the WHERE clause below to ensure consistent results with READ_COMMITTED
s = "select txn_id from TXNS where txn_id <= " + hwm;
@@ -249,8 +265,7 @@ public class TxnHandler {
throw new MetaException("Unable to select from transaction database, "
+ StringUtils.stringifyException(e));
} finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
return getOpenTxns();
@@ -284,17 +299,35 @@ public class TxnHandler {
try {
Connection dbConn = null;
Statement stmt = null;
+ ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ /**
+ * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
+ * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
+ * Also, advancing the counter must work when multiple metastores are running, thus either
+ * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents
+ * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+ *
+ * In the current design, there can be several metastore instances running in a given Warehouse.
+ * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example,
+ * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
+ * Now the same client will start another transaction, except it ends up on MS2 and may get
+ * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot
+ * on read will thing the version of the row from transaction ID 500 is the latest one.
+ *
+ * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This
+ * set could support a write-through cache for added performance.
+ */
+ dbConn = getDbConn(getRequiredIsolationLevel());
// Make sure the user has not requested an insane amount of txns.
int maxTxns = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
if (numTxns > maxTxns) numTxns = maxTxns;
stmt = dbConn.createStatement();
- String s = "select ntxn_next from NEXT_TXN_ID";
+ String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID");
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
throw new MetaException("Transaction database not properly " +
"configured, can't find next transaction id.");
@@ -312,10 +345,11 @@ public class TxnHandler {
List<Long> txnIds = new ArrayList<Long>(numTxns);
for (long i = first; i < first + numTxns; i++) {
ps.setLong(1, i);
+ //todo: this would be more efficient with a single insert with multiple rows in values()
+ //need add a safeguard to not exceed the DB capabilities.
ps.executeUpdate();
txnIds.add(i);
}
-
LOG.debug("Going to commit");
dbConn.commit();
return new OpenTxnsResponse(txnIds);
@@ -326,8 +360,7 @@ public class TxnHandler {
throw new MetaException("Unable to select from transaction database "
+ StringUtils.stringifyException(e));
} finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(rs, stmt, dbConn);
}
} catch (RetryException e) {
return openTxns(rqst);
@@ -369,6 +402,24 @@ public class TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
+ /**
+ * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS)
+ * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not
+ * normally run concurrently (for same txn) but could due to bugs in the client which could then
+ * (w/o SERIALIZABLE) corrupt internal transaction manager state. Also competes with abortTxn()
+ *
+ * Sketch of an improvement:
+ * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state.
+ * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like
+ * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would
+ * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the
+ * entry for this txn in TXNS in 'c' acts like a monitor.
+ * Since the move to 'c' state is in one txn (to make it visible) and the rest of the
+ * operations in another (could even be made separate txns), there is a possibility of failure
+ * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns.
+ *
+ * Or perhaps Select * TXNS where txn_id = " + txnid; for update
+ */
dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
stmt = dbConn.createStatement();
// Before we do the commit heartbeat the txn. This is slightly odd in that we're going to
@@ -423,7 +474,7 @@ public class TxnHandler {
Connection dbConn = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- return lock(dbConn, rqst, true);
+ return lock(dbConn, rqst);
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
@@ -438,48 +489,49 @@ public class TxnHandler {
}
}
- public LockResponse lockNoWait(LockRequest rqst)
- throws NoSuchTxnException, TxnAbortedException, MetaException {
- try {
- Connection dbConn = null;
- try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- return lock(dbConn, rqst, false);
- } catch (SQLException e) {
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "lockNoWait(" + rqst + ")");
- throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
- } finally {
- closeDbConn(dbConn);
- }
- } catch (RetryException e) {
- return lockNoWait(rqst);
- }
- }
-
+ /**
+ * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one.
+ * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change.
+ *
+ * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(),
+ * in practice more often)
+ * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB.
+ *
+ * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
+ * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
+ *
+ * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking
+ * against doesn't move from W to A in another txn) but this method can heartbeat in
+ * separate txn at READ_COMMITTED.
+ */
public LockResponse checkLock(CheckLockRequest rqst)
throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
Connection dbConn = null;
+ long extLockId = rqst.getLockid();
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- long extLockId = rqst.getLockid();
-
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
// Heartbeat on the lockid first, to assure that our lock is still valid.
// Then look up the lock info (hopefully in the cache). If these locks
// are associated with a transaction then heartbeat on that as well.
- heartbeatLock(dbConn, extLockId);
- long txnid = getTxnIdFromLockId(dbConn, extLockId);
- if (txnid > 0) heartbeatTxn(dbConn, txnid);
- return checkLock(dbConn, extLockId, true);
+ Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+ if(txnid == null) {
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if (txnid > 0) {
+ heartbeatTxn(dbConn, txnid);
+ }
+ else {
+ heartbeatLock(dbConn, extLockId);
+ }
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ return checkLock(dbConn, extLockId);
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
} finally {
closeDbConn(dbConn);
}
@@ -489,39 +541,56 @@ public class TxnHandler {
}
+ /**
+ * This would have been made simpler if all locks were associated with a txn. Then only txn needs to
+ * be heartbeated, committed, etc. no need for client to track individual locks.
+ */
public void unlock(UnlockRequest rqst)
throws NoSuchLockException, TxnOpenException, MetaException {
try {
Connection dbConn = null;
Statement stmt = null;
+ long extLockId = rqst.getLockid();
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- // Odd as it seems, we need to heartbeat first because this touches the
- // lock table and assures that our locks our still valid. If they are
- // not, this will throw an exception and the heartbeat will fail.
- long extLockId = rqst.getLockid();
- heartbeatLock(dbConn, extLockId);
- long txnid = getTxnIdFromLockId(dbConn, extLockId);
- // If there is a valid txnid, throw an exception,
- // as locks associated with transactions should be unlocked only when the
- // transaction is committed or aborted.
- if (txnid > 0) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- String msg = "Unlocking locks associated with transaction" +
- " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
- "transaction " + JavaUtils.txnIdToString(txnid);
- LOG.error(msg);
- throw new TxnOpenException(msg);
- }
+ /**
+ * This method is logically like commit for read-only auto commit queries.
+ * READ_COMMITTED since this only has 1 delete statement and no new entries with the
+ * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are
+ * created in a single atomic operation.
+ * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)}
+ * but hl_lock_ext_id is not known until that method returns.
+ * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}
+ * but using SERIALIZABLE doesn't materially change the interaction.
+ * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg.
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId;
+ //hl_txnid <> 0 means it's associated with a transaction
+ String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0";
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
- throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ Long txnid = getTxnIdFromLockId(dbConn, extLockId);
+ if(txnid == null) {
+ LOG.error("No lock found for unlock(" + rqst + ")");
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if(txnid != 0) {
+ String msg = "Unlocking locks associated with transaction" +
+ " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " +
+ "transaction " + JavaUtils.txnIdToString(txnid);
+ LOG.error(msg);
+ throw new TxnOpenException(msg);
+ }
+ if(txnid == 0) {
+ //we didn't see this lock when running DELETE stmt above but now it showed up
+ //so should "should never happen" happened...
+ String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid);
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
}
LOG.debug("Going to commit");
dbConn.commit();
@@ -530,7 +599,7 @@ public class TxnHandler {
rollbackDBConn(dbConn);
checkRetryable(dbConn, e, "unlock(" + rqst + ")");
throw new MetaException("Unable to update transaction database " +
- StringUtils.stringifyException(e));
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
} finally {
closeStmt(stmt);
closeDbConn(dbConn);
@@ -615,6 +684,10 @@ public class TxnHandler {
}
}
+ /**
+ * {@code ids} should only have txnid or lockid but not both, ideally.
+ * Currently DBTxnManager.heartbeat() enforces this.
+ */
public void heartbeat(HeartbeatRequest ids)
throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
try {
@@ -647,9 +720,17 @@ public class TxnHandler {
rsp.setNosuch(nosuch);
rsp.setAborted(aborted);
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ /**
+ * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)}
+ * only has 1 update statement in it and
+ * we only update existing txns, i.e. nothing can add additional txns that this operation
+ * would care about (which would have required SERIALIZABLE)
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
try {
+ //todo: this is expensive call: at least 2 update queries per txn
+ //is this really worth it?
heartbeatTxn(dbConn, txn);
} catch (NoSuchTxnException e) {
nosuch.add(txn);
@@ -678,11 +759,11 @@ public class TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(getRequiredIsolationLevel());
stmt = dbConn.createStatement();
// Get the id for the next entry in the queue
- String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID";
+ String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID");
LOG.debug("going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
@@ -1293,19 +1374,31 @@ public class TxnHandler {
}
}
+ private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+ return abortTxns(dbConn, txnids, -1);
+ }
/**
- * Abort a group of txns
+ * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining
+ * trasnactions in a batch on IOExceptions.
* @param dbConn An active connection
* @param txnids list of transactions to abort
+ * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
+ * hearbetated after #performTimeOuts() select and this operation.
* @return Number of aborted transactions
* @throws SQLException
*/
- private int abortTxns(Connection dbConn, List<Long> txnids) throws SQLException {
+ private int abortTxns(Connection dbConn, List<Long> txnids, long max_heartbeat) throws SQLException {
Statement stmt = null;
int updateCnt = 0;
if (txnids.isEmpty()) {
return 0;
}
+ if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) {
+ /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently
+ * which would cause them to become orphaned.
+ */
+ throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation());
+ }
try {
stmt = dbConn.createStatement();
@@ -1321,6 +1414,8 @@ public class TxnHandler {
LOG.debug("Going to execute update <" + buf.toString() + ">");
stmt.executeUpdate(buf.toString());
+ //todo: seems like we should do this first and if it misses, don't bother with
+ //delete from HIVE_LOCKS since it will be rolled back
buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED +
"' where txn_state = '" + TXN_OPEN + "' and txn_id in (");
first = true;
@@ -1330,6 +1425,9 @@ public class TxnHandler {
buf.append(id);
}
buf.append(')');
+ if(max_heartbeat > 0) {
+ buf.append(" and txn_last_heartbeat < ").append(max_heartbeat);
+ }
LOG.debug("Going to execute update <" + buf.toString() + ">");
updateCnt = stmt.executeUpdate(buf.toString());
@@ -1340,22 +1438,33 @@ public class TxnHandler {
}
/**
+ * Isolation Level Notes:
+ * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here.
+ *
+ * Ramblings:
+ * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC
+ * since this is just in Wait state.
+ * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of
+ * failure, the W locks will timeout if failure does not propagate to client in some way, or it
+ * will and client will Abort).
+ * Actually, whether we can do this depends on what happens when you try to get a lock and notice
+ * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you
+ * are checking new locks someone may insert new W locks that you don't see...
+ * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume
+ * that additional W locks will have higher IDs????
+ *
+ * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn.
+ * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think,
+ * but it will not be updating the same row from 2 DB.
+ *
* Request a lock
* @param dbConn database connection
* @param rqst lock information
- * @param wait whether to wait for this lock. The function will return immediately one way or
- * another. If true and the lock could not be acquired the response will have a
- * state of WAITING. The caller will then need to poll using
- * {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}. If
- * false and the lock could not be acquired, then the response will have a state
- * of NOT_ACQUIRED. The caller will need to call
- * {@link #lockNoWait(org.apache.hadoop.hive.metastore.api.LockRequest)} again to
- * attempt another lock.
* @return information on whether the lock was acquired.
* @throws NoSuchTxnException
* @throws TxnAbortedException
*/
- private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait)
+ private LockResponse lock(Connection dbConn, LockRequest rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
// We want to minimize the number of concurrent lock requests being issued. If we do not we
// get a large number of deadlocks in the database, since this method has to both clean
@@ -1368,13 +1477,25 @@ public class TxnHandler {
// etc.) that should not interfere with this one.
synchronized (lockLock) {
Statement stmt = null;
+ ResultSet rs = null;
try {
+ long txnid = rqst.getTxnid();
+ if (txnid > 0) {
+ // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
+ // are locking.
+ heartbeatTxn(dbConn, txnid);
+ }
stmt = dbConn.createStatement();
- // Get the next lock id.
- String s = "select nl_next from NEXT_LOCK_ID";
+ /** Get the next lock id.
+ * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
+ * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7,
+ * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks,
+ * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
+ * doesn't block on locks acquired later than one it's checking*/
+ String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -1385,18 +1506,19 @@ public class TxnHandler {
s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
- LOG.debug("Going to commit.");
- dbConn.commit();
- long txnid = rqst.getTxnid();
if (txnid > 0) {
- // Heartbeat the transaction so we know it is valid and we avoid it timing out while we
- // are locking.
- heartbeatTxn(dbConn, txnid);
-
// For each component in this lock request,
// add an entry to the txn_components table
// This must be done before HIVE_LOCKS is accessed
+
+ //Isolation note:
+ //the !wait option is not actually used anywhere. W/o that,
+ // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id
+ //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable.
+ //
+ // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long
+ //as check lock is at serializable (or any other way to make sure it's exclusive)
for (LockComponent lc : rqst.getComponent()) {
String dbName = lc.getDbname();
String tblName = lc.getTablename();
@@ -1429,34 +1551,42 @@ public class TxnHandler {
" (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
"hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
" values (" + extLockId + ", " +
- + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" +
+ + intLockId + "," + txnid + ", '" +
dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" )
+ ", " + (partName == null ? "null" : "'" + partName + "'") +
- ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" +
+ ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " +
+ //for locks associated with a txn, we always heartbeat txn and timeout based on that
+ (isValidTxn(txnid) ? 0 : now) + ", '" +
rqst.getUser() + "', '" + rqst.getHostname() + "')";
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
}
- LockResponse rsp = checkLock(dbConn, extLockId, wait);
- if (!wait && rsp.getState() != LockState.ACQUIRED) {
- LOG.debug("Lock not acquired, going to rollback");
- dbConn.rollback();
- rsp = new LockResponse();
- rsp.setState(LockState.NOT_ACQUIRED);
- }
- return rsp;
+ /**to make txns shorter we could commit here and start a new txn for checkLock. This would
+ * require moving checkRetryable() down into here. Could we then run the part before this
+ * commit are READ_COMMITTED?*/
+ return checkLock(dbConn, extLockId);
} catch (NoSuchLockException e) {
// This should never happen, as we just added the lock id
throw new MetaException("Couldn't find a lock we just created!");
} finally {
+ close(rs);
closeStmt(stmt);
}
}
}
-
+ private static boolean isValidTxn(long txnId) {
+ return txnId != 0;
+ }
+ /**
+ * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
+ * all locks for given extLockId or none. Would be more efficient to update state on all locks
+ * at once. Semantics are the same since this is all part of the same txn@serializable.
+ *
+ * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+ * hl_lock_ext_id by only checking earlier locks.
+ */
private LockResponse checkLock(Connection dbConn,
- long extLockId,
- boolean alwaysCommit)
+ long extLockId)
throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
LockResponse response = new LockResponse();
@@ -1609,19 +1739,15 @@ public class TxnHandler {
case WAIT:
if(!ignoreConflict(info, locks[i])) {
wait(dbConn, save);
- if (alwaysCommit) {
- // In the case where lockNoWait has been called we don't want to commit because
- // it's going to roll everything back. In every other case we want to commit here.
- LOG.debug("Going to commit");
- dbConn.commit();
- }
+ LOG.debug("Going to commit");
+ dbConn.commit();
response.setState(LockState.WAITING);
LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")");
return response;
}
//fall through to ACQUIRE
case ACQUIRE:
- acquire(dbConn, stmt, extLockId, info.intLockId);
+ acquire(dbConn, stmt, extLockId, info);
acquired = true;
break;
case KEEP_LOOKING:
@@ -1633,7 +1759,7 @@ public class TxnHandler {
// If we've arrived here and we have not already acquired, it means there's nothing in the
// way of the lock, so acquire the lock.
- if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId);
+ if (!acquired) acquire(dbConn, stmt, extLockId, info);
}
// We acquired all of the locks, so commit and return acquired.
@@ -1677,26 +1803,31 @@ public class TxnHandler {
dbConn.rollback(save);
}
- private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId)
+ private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo)
throws SQLException, NoSuchLockException, MetaException {
long now = getDbTime(dbConn);
String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
- "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
- extLockId + " and hl_lock_int_id = " + intLockId;
+ //if lock is part of txn, heartbeat info is in txn record
+ "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
+ ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+ extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," +
- + intLockId + ")");
+ + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId));
}
// We update the database, but we don't commit because there may be other
// locks together with this, and we only want to acquire one if we can
// acquire all.
}
- // Heartbeats on the lock table. This commits, so do not enter it with any state
+ /**
+ * Heartbeats on the lock table. This commits, so do not enter it with any state.
+ * Should not be called on a lock that belongs to transaction.
+ */
private void heartbeatLock(Connection dbConn, long extLockId)
throws NoSuchLockException, SQLException, MetaException {
// If the lock id is 0, then there are no locks in this heartbeat
@@ -1731,7 +1862,6 @@ public class TxnHandler {
try {
stmt = dbConn.createStatement();
long now = getDbTime(dbConn);
- ensureValidTxn(dbConn, txnid, stmt);
String s = "update TXNS set txn_last_heartbeat = " + now +
" where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'";
LOG.debug("Going to execute update <" + s + ">");
@@ -1742,10 +1872,6 @@ public class TxnHandler {
dbConn.rollback();
throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
}
- //update locks for this txn to the same heartbeat
- s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid;
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
LOG.debug("Going to commit");
dbConn.commit();
} finally {
@@ -1760,6 +1886,7 @@ public class TxnHandler {
LOG.debug("Going to execute query <" + s + ">");
ResultSet rs = stmt.executeQuery(s);
if (!rs.next()) {
+ //todo: add LIMIT 1 instead of count - should be more efficient
s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
ResultSet rs2 = stmt.executeQuery(s);
boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
@@ -1775,28 +1902,28 @@ public class TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) +
- " already aborted");//todo: add time of abort, which is not currently tracked
+ " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change
}
}
- // NEVER call this function without first calling heartbeat(long, long)
- private long getTxnIdFromLockId(Connection dbConn, long extLockId)
+ private Long getTxnIdFromLockId(Connection dbConn, long extLockId)
throws NoSuchLockException, MetaException, SQLException {
Statement stmt = null;
+ ResultSet rs = null;
try {
stmt = dbConn.createStatement();
String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " +
extLockId;
LOG.debug("Going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ rs = stmt.executeQuery(s);
if (!rs.next()) {
- throw new MetaException("This should never happen! We already " +
- "checked the lock existed but now we can't find it!");
+ return null;
}
long txnid = rs.getLong(1);
- LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid));
- return (rs.wasNull() ? -1 : txnid);
+ LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid));
+ return txnid;
} finally {
+ close(rs);
closeStmt(stmt);
}
}
@@ -1832,14 +1959,13 @@ public class TxnHandler {
// for read-only autoCommit=true statements. This does a commit,
// and thus should be done before any calls to heartbeat that will leave
// open transactions.
- private void timeOutLocks(Connection dbConn) {
+ private void timeOutLocks(Connection dbConn, long now) {
Statement stmt = null;
try {
- long now = getDbTime(dbConn);
stmt = dbConn.createStatement();
// Remove any timed out locks from the table.
String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
- (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is
+ (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
//associated with a txn and is handled by performTimeOuts()
//want to avoid expiring locks for a txn w/o expiring the txn itself
LOG.debug("Going to execute update <" + s + ">");
@@ -1891,6 +2017,8 @@ public class TxnHandler {
}
}
/**
+ * Isolation Level Notes
+ * Plain: RC is OK
* This will find transactions that have timed out and abort them.
* Will also delete locks which are not associated with a transaction and have timed out
* Tries to keep transactions (against metastore db) small to reduce lock contention.
@@ -1900,9 +2028,19 @@ public class TxnHandler {
Statement stmt = null;
ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ //We currently commit after selecting the TXNS to abort. So whether SERIALIZABLE
+ //READ_COMMITTED, the effect is the same. We could use FOR UPDATE on Select from TXNS
+ //and do the whole performTimeOuts() in a single huge transaction, but the only benefit
+ //would be to make sure someone cannot heartbeat one of these txns at the same time.
+ //The attempt to heartbeat would block and fail immediately after it's unblocked.
+ //With current (RC + multiple txns) implementation it is possible for someone to send
+ //heartbeat at the very end of the expire interval, and just after the Select from TXNS
+ //is made, in which case heartbeat will succeed but txn will still be Aborted.
+ //Solving this corner case is not worth the perf penalty. The client should heartbeat in a
+ //timely way.
long now = getDbTime(dbConn);
- timeOutLocks(dbConn);
+ timeOutLocks(dbConn, now);
while(true) {
stmt = dbConn.createStatement();
String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN +
@@ -1923,16 +2061,26 @@ public class TxnHandler {
timedOutTxns.add(currentBatch);
}
} while(rs.next());
- close(rs, stmt, null);
dbConn.commit();
+ close(rs, stmt, dbConn);
+ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ int numTxnsAborted = 0;
for(List<Long> batchToAbort : timedOutTxns) {
- abortTxns(dbConn, batchToAbort);
- dbConn.commit();
- //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
- LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+ if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) {
+ dbConn.commit();
+ numTxnsAborted += batchToAbort.size();
+ //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
+ LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString());
+ }
+ else {
+ //could not abort all txns in this batch - this may happen because in parallel with this
+ //operation there was activity on one of the txns in this batch (commit/abort/heartbeat)
+ //This is not likely but may happen if client experiences long pause between heartbeats or
+ //unusually long/extreme pauses between heartbeat() calls and other logic in checkLock(),
+ //lock(), etc.
+ dbConn.rollback();
+ }
}
- int numTxnsAborted = (timedOutTxns.size() - 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE +
- timedOutTxns.get(timedOutTxns.size() - 1).size();
LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout");
}
} catch (SQLException ex) {
@@ -2110,4 +2258,97 @@ public class TxnHandler {
private static String getMessage(SQLException ex) {
return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")";
}
+ /**
+ * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc.
+ * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE
+ * functionality. Where that is not available, SERIALIZABLE isolation is used.
+ * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that
+ * if FOR UPDATE is not available, must run operation at SERIALIZABLE.
+ */
+ private int getRequiredIsolationLevel() throws MetaException, SQLException {
+ if(dbProduct == null) {
+ Connection tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ determineDatabaseProduct(tmp);
+ closeDbConn(tmp);
+ }
+ switch (dbProduct) {
+ case DERBY:
+ return Connection.TRANSACTION_SERIALIZABLE;
+ case MYSQL:
+ case ORACLE:
+ case POSTGRES:
+ case SQLSERVER:
+ return Connection.TRANSACTION_READ_COMMITTED;
+ default:
+ String msg = "Unrecognized database product name <" + dbProduct + ">";
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+ /**
+ * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
+ * construct. If the DB doesn't support, return original select. This method must always
+ * agree with {@link #getRequiredIsolationLevel()}
+ */
+ private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException {
+ DatabaseProduct prod = determineDatabaseProduct(dbConn);
+ switch (prod) {
+ case DERBY:
+ //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
+ //sadly in Derby, FOR UPDATE doesn't meant what it should
+ return selectStatement;
+ case MYSQL:
+ //http://dev.mysql.com/doc/refman/5.7/en/select.html
+ case ORACLE:
+ //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
+ case POSTGRES:
+ //http://www.postgresql.org/docs/9.0/static/sql-select.html
+ return selectStatement + " for update";
+ case SQLSERVER:
+ //https://msdn.microsoft.com/en-us/library/ms189499.aspx
+ //https://msdn.microsoft.com/en-us/library/ms187373.aspx
+ return selectStatement + " with(updlock)";
+ default:
+ String msg = "Unrecognized database product name <" + prod + ">";
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+ /**
+ * the caller is expected to retry if this fails
+ *
+ * @return
+ * @throws SQLException
+ * @throws MetaException
+ */
+ private long generateNewExtLockId() throws SQLException, MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(getRequiredIsolationLevel());
+ stmt = dbConn.createStatement();
+
+ // Get the next lock id.
+ String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID");
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_lock_id");
+ }
+ long extLockId = rs.getLong(1);
+ s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit.");
+ dbConn.commit();
+ return extLockId;
+ }
+ finally {
+ close(rs, stmt, dbConn);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index ad99427..32c3d80 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -264,43 +264,6 @@ public class TestCompactionTxnHandler {
}
@Test
- public void testLockNoWait() throws Exception {
- // Test that we can acquire the lock alone
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lockNoWait(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.unlock(new UnlockRequest(res.getLockid()));
-
- // test that another lock blocks it
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertEquals(LockState.ACQUIRED, res.getState());
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lockNoWait(req);
- assertEquals(LockState.NOT_ACQUIRED, res.getState());
- assertEquals(1, TxnDbUtil.findNumCurrentLocks());
- }
-
- @Test
public void testFindPotentialCompactions() throws Exception {
// Test that committing unlocks
long txnid = openTxn();
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 6461435..e53daae 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1067,8 +1067,8 @@ public class TestTxnHandler {
assertNull(lock.getPartname());
assertEquals(LockState.ACQUIRED, lock.getState());
assertEquals(LockType.EXCLUSIVE, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
- System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+ lock.getTxnid() != 0);
assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
+ " and " + System.currentTimeMillis(),
begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1082,8 +1082,8 @@ public class TestTxnHandler {
assertNull(lock.getPartname());
assertEquals(LockState.WAITING, lock.getState());
assertEquals(LockType.SHARED_READ, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
- System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
+ lock.getTxnid() != 0);
assertEquals(0, lock.getAcquiredat());
assertEquals("me", lock.getUser());
assertEquals("localhost", lock.getHostname());
@@ -1095,7 +1095,7 @@ public class TestTxnHandler {
assertEquals("yourpartition", lock.getPartname());
assertEquals(LockState.ACQUIRED, lock.getState());
assertEquals(LockType.SHARED_WRITE, lock.getType());
- assertTrue(begining <= lock.getLastheartbeat() &&
+ assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
System.currentTimeMillis() >= lock.getLastheartbeat());
assertTrue(begining <= lock.getAcquiredat() &&
System.currentTimeMillis() >= lock.getAcquiredat());
http://git-wip-us.apache.org/repos/asf/hive/blob/a80841b7/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 33c6ab5..c134653 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1028,7 +1028,7 @@ public class Driver implements CommandProcessor {
// don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
//For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
//for each statement.
- recordValidTxns();
+ recordValidTxns();//todo: we should only need to do this for RO query if it has ACID resources in it.
}
return 0;