You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/05/19 19:29:21 UTC
[2/6] hive git commit: HIVE-13622 WriteSet tracking optimizations
(Eugene Koifman, reviewed by Alan Gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/f25b8652/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig
new file mode 100644
index 0000000..bc818e0
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java.orig
@@ -0,0 +1,3233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Service;
+import com.jolbox.bonecp.BoneCPConfig;
+import com.jolbox.bonecp.BoneCPDataSource;
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hive.common.ServerUtils;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.util.StringUtils;
+
+import javax.sql.DataSource;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+/**
+ * A handler to answer transaction related calls that come into the metastore
+ * server.
+ *
+ * Note on log messages: Please include txnid:X and lockid info using
+ * {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)}
+ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages.
+ * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
+ * so keeping the format consistent makes grep'ing the logs much easier.
+ *
+ * Note on HIVE_LOCKS.hl_last_heartbeat.
+ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
+ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
+ * transaction in TXNS.
+ *
+ * In general there can be multiple metastores where this logic can execute, thus the DB is
+ * used to ensure proper mutexing of operations.
+ * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is
+ * used to properly sequence operations. Most notably:
+ * 1. various sequence IDs are generated with aid of this mutex
+ * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state
+ * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all
+ * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
+ * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
+ * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
+ * can be granted, no other (strictly speaking "earlier") lock can change state.
+ *
+ * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded
+ * (this is the only supported configuration for Derby)
+ * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations.
+ *
+ * {@link #derbyLock}
+
+ * If we ever decide to run remote Derby server, according to
+ * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
+ * seriazlied, so that would also work though has not been tested.
+ *
+ * General design note:
+ * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
+ * still valid and active. In the code this is usually achieved at the same time the txn record
+ * is locked for some operation.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
+
+ static final protected char INITIATED_STATE = 'i';
+ static final protected char WORKING_STATE = 'w';
+ static final protected char READY_FOR_CLEANING = 'r';
+ static final char FAILED_STATE = 'f';
+ static final char SUCCEEDED_STATE = 's';
+ static final char ATTEMPTED_STATE = 'a';
+
+ // Compactor types
+ static final protected char MAJOR_TYPE = 'a';
+ static final protected char MINOR_TYPE = 'i';
+
+ // Transaction states
+ static final protected char TXN_ABORTED = 'a';
+ static final protected char TXN_OPEN = 'o';
+
+ // Lock states
+ static final protected char LOCK_ACQUIRED = 'a';
+ static final protected char LOCK_WAITING = 'w';
+
+ // Lock types
+ static final protected char LOCK_EXCLUSIVE = 'e';
+ static final protected char LOCK_SHARED = 'r';
+ static final protected char LOCK_SEMI_SHARED = 'w';
+
+ static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
+ static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
+
+ static private DataSource connPool;
+ static private boolean doRetryOnConnPool = false;
+
+ private enum OpertaionType {
+ INSERT('i'), UPDATE('u'), DELETE('d');
+ private final char sqlConst;
+ OpertaionType(char sqlConst) {
+ this.sqlConst = sqlConst;
+ }
+ public String toString() {
+ return Character.toString(sqlConst);
+ }
+ public static OpertaionType fromString(char sqlConst) {
+ switch (sqlConst) {
+ case 'i':
+ return INSERT;
+ case 'u':
+ return UPDATE;
+ case 'd':
+ return DELETE;
+ default:
+ throw new IllegalArgumentException(quoteChar(sqlConst));
+ }
+ }
+ //we should instead just pass in OpertaionType from client (HIVE-13622)
+ @Deprecated
+ public static OpertaionType fromLockType(LockType lockType) {
+ switch (lockType) {
+ case SHARED_READ:
+ return INSERT;
+ case SHARED_WRITE:
+ return UPDATE;
+ default:
+ throw new IllegalArgumentException("Unexpected lock type: " + lockType);
+ }
+ }
+ }
+
+ /**
+ * Number of consecutive deadlocks we have seen
+ */
+ private int deadlockCnt;
+ private long deadlockRetryInterval;
+ protected HiveConf conf;
+ protected DatabaseProduct dbProduct;
+
+ // (End user) Transaction timeout, in milliseconds.
+ private long timeout;
+
+ private String identifierQuoteString; // quotes to use for quoting tables, where necessary
+ private long retryInterval;
+ private int retryLimit;
+ private int retryNum;
+ /**
+ * Derby specific concurrency control
+ */
+ private static final ReentrantLock derbyLock = new ReentrantLock(true);
+ /**
+ * must be static since even in UT there may be > 1 instance of TxnHandler
+ * (e.g. via Compactor services)
+ */
+ private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
+ private static final String hostname = ServerUtils.hostname();
+
+ // Private methods should never catch SQLException and then throw MetaException. The public
+ // methods depend on SQLException coming back so they can detect and handle deadlocks. Private
+ // methods should only throw MetaException when they explicitly know there's a logic error and
+ // they want to throw past the public methods.
+ //
+ // All public methods that write to the database have to check for deadlocks when a SQLException
+ // comes back and handle it if they see one. This has to be done with the connection pooling
+ // in mind. To do this they should call checkRetryable() AFTER rolling back the db transaction,
+ // and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
+
+ public TxnHandler() {
+ }
+
+ /**
+ * This is logically part of c'tor and must be called prior to any other method.
+ * Not physically part of c'tor due to use of relfection
+ */
+ public void setConf(HiveConf conf) {
+ this.conf = conf;
+
+ checkQFileTestHack();
+
+ Connection dbConn = null;
+ // Set up the JDBC connection pool
+ try {
+ setupJdbcConnectionPool(conf);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ determineDatabaseProduct(dbConn);
+ } catch (SQLException e) {
+ String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
+ LOG.error(msg);
+ throw new RuntimeException(e);
+ }
+ finally {
+ closeDbConn(dbConn);
+ }
+
+ timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ buildJumpTable();
+ retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL,
+ TimeUnit.MILLISECONDS);
+ retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
+ deadlockRetryInterval = retryInterval / 10;
+ }
+
+ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
+ try {
+ // We need to figure out the current transaction number and the list of
+ // open transactions. To avoid needing a transaction on the underlying
+ // database we'll look at the current transaction number first. If it
+ // subsequently shows up in the open list that's ok.
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ /**
+ * This method can run at READ_COMMITTED as long as long as
+ * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
+ * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
+ * adding corresponding entries into TXNS. The reason is that any txnid below HWM
+ * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_txn_id");
+ }
+ long hwm = rs.getLong(1);
+ if (rs.wasNull()) {
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, null record found in next_txn_id");
+ }
+ close(rs);
+ List<TxnInfo> txnInfo = new ArrayList<TxnInfo>();
+ //need the WHERE clause below to ensure consistent results with READ_COMMITTED
+ s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm;
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ char c = rs.getString(2).charAt(0);
+ TxnState state;
+ switch (c) {
+ case TXN_ABORTED:
+ state = TxnState.ABORTED;
+ break;
+
+ case TXN_OPEN:
+ state = TxnState.OPEN;
+ break;
+
+ default:
+ throw new MetaException("Unexpected transaction state " + c +
+ " found in txns table");
+ }
+ txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4)));
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ return new GetOpenTxnsInfoResponse(hwm, txnInfo);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getOpenTxnsInfo");
+ throw new MetaException("Unable to select from transaction database: " + getMessage(e)
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return getOpenTxnsInfo();
+ }
+ }
+
+ public GetOpenTxnsResponse getOpenTxns() throws MetaException {
+ try {
+ // We need to figure out the current transaction number and the list of
+ // open transactions. To avoid needing a transaction on the underlying
+ // database we'll look at the current transaction number first. If it
+ // subsequently shows up in the open list that's ok.
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ /**
+ * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+\ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select ntxn_next - 1 from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_txn_id");
+ }
+ long hwm = rs.getLong(1);
+ if (rs.wasNull()) {
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, null record found in next_txn_id");
+ }
+ close(rs);
+ Set<Long> openList = new HashSet<Long>();
+ //need the WHERE clause below to ensure consistent results with READ_COMMITTED
+ s = "select txn_id from TXNS where txn_id <= " + hwm;
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ openList.add(rs.getLong(1));
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ return new GetOpenTxnsResponse(hwm, openList);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getOpenTxns");
+ throw new MetaException("Unable to select from transaction database, "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return getOpenTxns();
+ }
+ }
+ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+ int numTxns = rqst.getNum_txns();
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ lockInternal();
+ /**
+ * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
+ * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
+ * Also, advancing the counter must work when multiple metastores are running.
+ * SELECT ... FOR UPDATE is used to prevent
+ * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+ *
+ * In the current design, there can be several metastore instances running in a given Warehouse.
+ * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example,
+ * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
+ * Now the same client will start another transaction, except it ends up on MS2 and may get
+ * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot
+ * on read will thing the version of the row from transaction ID 500 is the latest one.
+ *
+ * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This
+ * set could support a write-through cache for added performance.
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ // Make sure the user has not requested an insane amount of txns.
+ int maxTxns = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH);
+ if (numTxns > maxTxns) numTxns = maxTxns;
+
+ stmt = dbConn.createStatement();
+ String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction database not properly " +
+ "configured, can't find next transaction id.");
+ }
+ long first = rs.getLong(1);
+ s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+
+ long now = getDbTime(dbConn);
+ List<Long> txnIds = new ArrayList<Long>(numTxns);
+ ArrayList<String> queries = new ArrayList<String>();
+ String query;
+ String insertClause = "insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host) values ";
+ StringBuilder valuesClause = new StringBuilder();
+
+ for (long i = first; i < first + numTxns; i++) {
+ txnIds.add(i);
+
+ if (i > first &&
+ (i - first) % conf.getIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
+ // wrap up the current query, and start a new one
+ query = insertClause + valuesClause.toString();
+ queries.add(query);
+
+ valuesClause.setLength(0);
+ valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now)
+ .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname())
+ .append("')");
+
+ continue;
+ }
+
+ if (i > first) {
+ valuesClause.append(", ");
+ }
+
+ valuesClause.append("(").append(i).append(", 'o', ").append(now).append(", ").append(now)
+ .append(", '").append(rqst.getUser()).append("', '").append(rqst.getHostname())
+ .append("')");
+ }
+
+ query = insertClause + valuesClause.toString();
+ queries.add(query);
+
+ for (String q : queries) {
+ LOG.debug("Going to execute update <" + q + ">");
+ stmt.execute(q);
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return new OpenTxnsResponse(txnIds);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return openTxns(rqst);
+ }
+ }
+
+ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException {
+ long txnid = rqst.getTxnid();
+ try {
+ Connection dbConn = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "abortTxn(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ abortTxn(rqst);
+ }
+ }
+
+ public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException {
+ List<Long> txnids = rqst.getTxn_ids();
+ try {
+ Connection dbConn = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ int numAborted = abortTxns(dbConn, txnids, false);
+ if (numAborted != txnids.size()) {
+ LOG.warn("Abort Transactions command only abort " + numAborted + " out of " +
+ txnids.size() + " transactions. It's possible that the other " +
+ (txnids.size() - numAborted) +
+ " transactions have been aborted or committed, or the transaction ids are invalid.");
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "abortTxns(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ abortTxns(rqst);
+ }
+ }
+
+ /**
+ * Concurrency/isolation notes:
+ * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
+ * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X
+ * see more notes below.
+ * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn
+ * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
+ * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of
+ * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
+ *
+ * Motivating example:
+ * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
+ * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
+ * that they read appropriately. In particular, if txns do not overlap, then one follows the other
+ * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure
+ * this by locking in snapshot after
+ * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
+ * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
+ * that txn T which will be considered a later txn, locks in a snapshot that includes the result
+ * of S's commit (assuming no other txns).
+ * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
+ * were running in parallel). If T and S both locked in the same snapshot (for example commit of
+ * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+ * 'x' would be updated to the same value by both, i.e. lost update.
+ */
+ public void commitTxn(CommitTxnRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
+ long txnid = rqst.getTxnid();
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet lockHandle = null;
+ ResultSet commitIdRs = null, rs;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ /**
+ * This S4U will mutex with other commitTxn() and openTxns().
+ * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
+ * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start
+ * at the same time and no new txns start until all 3 commit.
+ * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
+ */
+ commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
+ if(!commitIdRs.next()) {
+ throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+ }
+ long commitId = commitIdRs.getLong(1);
+ /**
+ * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other
+ * operation can change this txn (such acquiring locks). While lock() and commitTxn()
+ * should not normally run concurrently (for same txn) but could due to bugs in the client
+ * which could then corrupt internal transaction manager state. Also competes with abortTxn().
+ */
+ lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ if(lockHandle == null) {
+ //this also ensures that txn is still there and in expected state (hasn't been timed out)
+ ensureValidTxn(dbConn, txnid, stmt);
+ shouldNeverHappen(txnid);
+ }
+ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+ int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
+ " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " +
+ "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")");
+ if(numCompsWritten == 0) {
+ /**
+ * current txn didn't update/delete anything (may have inserted), so just proceed with commit
+ *
+ * We only care about commit id for write txns, so for RO (when supported) txns we don't
+ * have to mutex on NEXT_TXN_ID.
+ * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
+ * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
+ * If RO < W, then there is no reads-from relationship.
+ */
+ }
+ else {
+ /**
+ * see if there are any overlapping txns wrote the same element, i.e. have a conflict
+ * Since entire commit operation is mutexed wrt other start/commit ops,
+ * committed.ws_commit_id <= current.ws_commit_id for all txns
+ * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
+ * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
+ * [17,20] committed and [21,21] committing now - these do not overlap.
+ * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running)
+ */
+ rs = stmt.executeQuery
+ (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
+ "committed.ws_table, committed.ws_partition, cur.ws_commit_id " +
+ "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+ "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
+ //For partitioned table we always track writes at partition level (never at table)
+ //and for non partitioned - always at table level, thus the same table should never
+ //have entries with partition key and w/o
+ "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
+ "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
+ // with txnid, though any decent DB should infer this
+ " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
+ // part of this commitTxn() op
+ " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
+ //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
+ " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +
+ " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+ if(rs.next()) {
+ //found a conflict
+ String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+ StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+ String partitionName = rs.getString(5);
+ if(partitionName != null) {
+ resource.append('/').append(partitionName);
+ }
+ String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+ " committed by " + committedTxn;
+ close(rs);
+ //remove WRITE_SET info for current txn since it's about to abort
+ dbConn.rollback(undoWriteSetForCurrentTxn);
+ LOG.info(msg);
+ //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
+ if(abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ dbConn.commit();
+ close(null, stmt, dbConn);
+ throw new TxnAbortedException(msg);
+ }
+ else {
+ //no conflicting operations, proceed with the rest of commit sequence
+ }
+ }
+ // Move the record from txn_components into completed_txn_components so that the compactor
+ // knows where to look to compact.
+ String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
+ "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
+ LOG.debug("Going to execute insert <" + s + ">");
+ if (stmt.executeUpdate(s) < 1) {
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
+ LOG.info("Expected to move at least one record from txn_components to " +
+ "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+ }
+ s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ s = "delete from TXNS where txn_id = " + txnid;
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "commitTxn(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(commitIdRs);
+ close(lockHandle, stmt, dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ commitTxn(rqst);
+ }
+ }
+ @Override
+ public void performWriteSetGC() {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+ if(!rs.next()) {
+ throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
+ }
+ long highestAllocatedTxnId = rs.getLong(1);
+ close(rs);
+ rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
+ if(!rs.next()) {
+ throw new IllegalStateException("Scalar query returned no rows?!?!!");
+ }
+ long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
+ long lowestOpenTxnId = rs.getLong(1);
+ if(rs.wasNull()) {
+ //if here then there are no Open txns and highestAllocatedTxnId must be
+ //resolved (i.e. committed or aborted), either way
+ //there are no open txns with id <= highestAllocatedTxnId
+ //the +1 is there because "delete ..." below has < (which is correct for the case when
+ //there is an open txn
+ //Concurrency: even if new txn starts (or starts + commits) it is still true that
+ //there are no currently open txns that overlap with any committed txn with
+ //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough.
+ commitHighWaterMark = highestAllocatedTxnId + 1;
+ }
+ else {
+ commitHighWaterMark = lowestOpenTxnId;
+ }
+ int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
+ LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+ }
+ finally {
+ close(rs, stmt, dbConn);
+ }
+ }
+ /**
+ * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
+ * connection (but separate transactions). This avoid some flakiness in BONECP where if you
+ * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+ * doesn't see results of the first.
+ */
+ public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
+ ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
+ try {
+ return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid());
+ }
+ catch(NoSuchLockException e) {
+ // This should never happen, as we just added the lock id
+ throw new MetaException("Couldn't find a lock we just created! " + e.getMessage());
+ }
+ }
+ private static final class ConnectionLockIdPair {
+ private final Connection dbConn;
+ private final long extLockId;
+ private ConnectionLockIdPair(Connection dbConn, long extLockId) {
+ this.dbConn = dbConn;
+ this.extLockId = extLockId;
+ }
+ }
+
+ /**
+ * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
+ * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but
+ * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
+ *
+ * There is no real reason to return the ResultSet here other than to make sure the reference to it
+ * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock
+ * to be released.
+ * @param txnState the state this txn is expected to be in. may be null
+ * @return null if no row was found
+ * @throws SQLException
+ * @throws MetaException
+ */
+ private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
+ String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
+ ResultSet rs = stmt.executeQuery(addForUpdateClause(query));
+ if(rs.next()) {
+ return rs;
+ }
+ close(rs);
+ return null;
+ }
+
+ /**
+ * This enters locks into the queue in {@link #LOCK_WAITING} mode.
+ *
+ * Isolation Level Notes:
+ * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes
+ * any 2 {@code enqueueLockWithRetry()} calls.
+ * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations
+ * @see #checkLockWithRetry(Connection, long, long)
+ */
+ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
+ boolean success = false;
+ Connection dbConn = null;
+ try {
+ Statement stmt = null;
+ ResultSet rs = null;
+ ResultSet lockHandle = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ long txnid = rqst.getTxnid();
+ stmt = dbConn.createStatement();
+ if (isValidTxn(txnid)) {
+ //this also ensures that txn is still there in expected state
+ lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ if(lockHandle == null) {
+ ensureValidTxn(dbConn, txnid, stmt);
+ shouldNeverHappen(txnid);
+ }
+ }
+ /** Get the next lock id.
+ * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race.
+ * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7,
+ * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks,
+ * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)}
+ * doesn't block on locks acquired later than one it's checking*/
+ String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID");
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_lock_id");
+ }
+ long extLockId = rs.getLong(1);
+ s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+
+ if (txnid > 0) {
+ /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get)
+ * So if we add that to LockRequest we'll know that here
+ * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest
+ * to contain LockComponent for multiple operations.
+ * Deriving it from lock info doesn't distinguish between Update and Delete
+ *
+ * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
+ * FileSinkDesc.table is ql.metadata.Table
+ * Table.tableSpec which is TableSpec, which has specType which is SpecType
+ * So maybe this can work to know that this is part of dynamic partition insert in which case
+ * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
+ * In any case, that's an optimization for now; will be required when adding multi-stmt txns
+ */
+ // For each component in this lock request,
+ // add an entry to the txn_components table
+ // This must be done before HIVE_LOCKS is accessed
+ for (LockComponent lc : rqst.getComponent()) {
+ String dbName = lc.getDbname();
+ String tblName = lc.getTablename();
+ String partName = lc.getPartitionname();
+ s = "insert into TXN_COMPONENTS " +
+ "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " +
+ "values (" + txnid + ", '" + dbName + "', " +
+ (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+ (partName == null ? "null" : "'" + partName + "'")+ "," +
+ quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")";
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ }
+ }
+
+ long intLockId = 0;
+ for (LockComponent lc : rqst.getComponent()) {
+ intLockId++;
+ String dbName = lc.getDbname();
+ String tblName = lc.getTablename();
+ String partName = lc.getPartitionname();
+ LockType lockType = lc.getType();
+ char lockChar = 'z';
+ switch (lockType) {
+ case EXCLUSIVE:
+ lockChar = LOCK_EXCLUSIVE;
+ break;
+ case SHARED_READ:
+ lockChar = LOCK_SHARED;
+ break;
+ case SHARED_WRITE:
+ lockChar = LOCK_SEMI_SHARED;
+ break;
+ }
+ long now = getDbTime(dbConn);
+ s = "insert into HIVE_LOCKS " +
+ " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " +
+ "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" +
+ " values (" + extLockId + ", " +
+ +intLockId + "," + txnid + ", '" +
+ dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'")
+ + ", " + (partName == null ? "null" : "'" + partName + "'") +
+ ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " +
+ //for locks associated with a txn, we always heartbeat txn and timeout based on that
+ (isValidTxn(txnid) ? 0 : now) + ", '" +
+ rqst.getUser() + "', '" + rqst.getHostname() + "')";
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ }
+ dbConn.commit();
+ success = true;
+ return new ConnectionLockIdPair(dbConn, extLockId);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(lockHandle);
+ close(rs, stmt, null);
+ if (!success) {
+ /* This needs to return a "live" connection to be used by operation that follows it.
+ Thus it only closes Connection on failure/retry. */
+ closeDbConn(dbConn);
+ }
+ unlockInternal();
+ }
+ }
+ catch(RetryException e) {
+ return enqueueLockWithRetry(rqst);
+ }
+ }
+ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId)
+ throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException {
+ try {
+ try {
+ lockInternal();
+ if(dbConn.isClosed()) {
+ //should only get here if retrying this op
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ }
+ return checkLock(dbConn, extLockId);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")");
+ throw new MetaException("Unable to update transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ unlockInternal();
+ closeDbConn(dbConn);
+ }
+ }
+ catch(RetryException e) {
+ return checkLockWithRetry(dbConn, extLockId, txnId);
+ }
+ }
+ /**
+ * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one.
+ * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change.
+ *
+ * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(),
+ * in practice more often)
+ * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB.
+ *
+ * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
+ * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
+ *
+ * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking
+ * against doesn't move from W to A in another txn) but this method can heartbeat in
+ * separate txn at READ_COMMITTED.
+ */
+ public LockResponse checkLock(CheckLockRequest rqst)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
+ try {
+ Connection dbConn = null;
+ long extLockId = rqst.getLockid();
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ // Heartbeat on the lockid first, to assure that our lock is still valid.
+ // Then look up the lock info (hopefully in the cache). If these locks
+ // are associated with a transaction then heartbeat on that as well.
+ LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
+ if(info == null) {
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if (info.txnId > 0) {
+ heartbeatTxn(dbConn, info.txnId);
+ }
+ else {
+ heartbeatLock(dbConn, extLockId);
+ }
+ //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and
+ //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
+ //extra heartbeat is logically harmless, but ...
+ return checkLock(dbConn, extLockId);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "checkLock(" + rqst + " )");
+ throw new MetaException("Unable to update transaction database " +
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return checkLock(rqst);
+ }
+
+ }
+
+ /**
+ * This would have been made simpler if all locks were associated with a txn. Then only txn needs to
+ * be heartbeated, committed, etc. no need for client to track individual locks.
+ * When removing locks not associated with txn this potentially conflicts with
+ * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db.
+ * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed
+ */
+ public void unlock(UnlockRequest rqst)
+ throws NoSuchLockException, TxnOpenException, MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ long extLockId = rqst.getLockid();
+ try {
+ /**
+ * This method is logically like commit for read-only auto commit queries.
+ * READ_COMMITTED since this only has 1 delete statement and no new entries with the
+ * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are
+ * created in a single atomic operation.
+ * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)}
+ * but hl_lock_ext_id is not known until that method returns.
+ * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}
+ * but using SERIALIZABLE doesn't materially change the interaction.
+ * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg.
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ //hl_txnid <> 0 means it's associated with a transaction
+ String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" +
+ " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))";
+ //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where
+ //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example
+ LOG.debug("Going to execute update <" + s + ">");
+ int rc = stmt.executeUpdate(s);
+ if (rc < 1) {
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ LockInfo info = getTxnIdFromLockId(dbConn, extLockId);
+ if(info == null) {
+ //didn't find any lock with extLockId but at ReadCommitted there is a possibility that
+ //it existed when above delete ran but it didn't have the expected state.
+ LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")");
+ throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId));
+ }
+ if(info.txnId != 0) {
+ String msg = "Unlocking locks associated with transaction not permitted. " + info;
+ LOG.error(msg);
+ throw new TxnOpenException(msg);
+ }
+ if(info.txnId == 0) {
+ //we didn't see this lock when running DELETE stmt above but now it showed up
+ //so should "should never happen" happened...
+ String msg = "Found lock in unexpected state " + info;
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "unlock(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database " +
+ JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ unlock(rqst);
+ }
+ }
+
+ /**
+ * used to sort entries in {@link org.apache.hadoop.hive.metastore.api.ShowLocksResponse}
+ */
+ private static class LockInfoExt extends LockInfo {
+ private final ShowLocksResponseElement e;
+ LockInfoExt(ShowLocksResponseElement e) {
+ super(e);
+ this.e = e;
+ }
+ }
+ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException {
+ try {
+ Connection dbConn = null;
+ ShowLocksResponse rsp = new ShowLocksResponse();
+ List<ShowLocksResponseElement> elems = new ArrayList<ShowLocksResponseElement>();
+ List<LockInfoExt> sortedList = new ArrayList<LockInfoExt>();
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
+ "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
+ "hl_blockedby_ext_id, hl_blockedby_int_id from HIVE_LOCKS";
+
+ // Some filters may have been specified in the SHOW LOCKS statement. Add them to the query.
+ String dbName = rqst.getDbname();
+ String tableName = rqst.getTablename();
+ String partName = rqst.getPartname();
+
+ StringBuilder filter = new StringBuilder();
+ if (dbName != null && !dbName.isEmpty()) {
+ filter.append("hl_db=").append(quoteString(dbName));
+ }
+ if (tableName != null && !tableName.isEmpty()) {
+ if (filter.length() > 0) {
+ filter.append(" and ");
+ }
+ filter.append("hl_table=").append(quoteString(tableName));
+ }
+ if (partName != null && !partName.isEmpty()) {
+ if (filter.length() > 0) {
+ filter.append(" and ");
+ }
+ filter.append("hl_partition=").append(quoteString(partName));
+ }
+ String whereClause = filter.toString();
+
+ if (!whereClause.isEmpty()) {
+ s = s + " where " + whereClause;
+ }
+
+ LOG.debug("Doing to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ ShowLocksResponseElement e = new ShowLocksResponseElement();
+ e.setLockid(rs.getLong(1));
+ long txnid = rs.getLong(2);
+ if (!rs.wasNull()) e.setTxnid(txnid);
+ e.setDbname(rs.getString(3));
+ e.setTablename(rs.getString(4));
+ String partition = rs.getString(5);
+ if (partition != null) e.setPartname(partition);
+ switch (rs.getString(6).charAt(0)) {
+ case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break;
+ case LOCK_WAITING: e.setState(LockState.WAITING); break;
+ default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0));
+ }
+ switch (rs.getString(7).charAt(0)) {
+ case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break;
+ case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break;
+ case LOCK_SHARED: e.setType(LockType.SHARED_READ); break;
+ default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0));
+ }
+ e.setLastheartbeat(rs.getLong(8));
+ long acquiredAt = rs.getLong(9);
+ if (!rs.wasNull()) e.setAcquiredat(acquiredAt);
+ e.setUser(rs.getString(10));
+ e.setHostname(rs.getString(11));
+ e.setLockIdInternal(rs.getLong(12));
+ long id = rs.getLong(13);
+ if(!rs.wasNull()) {
+ e.setBlockedByExtId(id);
+ }
+ id = rs.getLong(14);
+ if(!rs.wasNull()) {
+ e.setBlockedByIntId(id);
+ }
+ sortedList.add(new LockInfoExt(e));
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ checkRetryable(dbConn, e, "showLocks(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
+ //by checkLock() - makes diagnostics easier.
+ Collections.sort(sortedList, new LockInfoComparator());
+ for(LockInfoExt lockInfoExt : sortedList) {
+ elems.add(lockInfoExt.e);
+ }
+ rsp.setLocks(elems);
+ return rsp;
+ } catch (RetryException e) {
+ return showLocks(rqst);
+ }
+ }
+
+ /**
+ * {@code ids} should only have txnid or lockid but not both, ideally.
+ * Currently DBTxnManager.heartbeat() enforces this.
+ */
+ public void heartbeat(HeartbeatRequest ids)
+ throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException {
+ try {
+ Connection dbConn = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ heartbeatLock(dbConn, ids.getLockid());
+ heartbeatTxn(dbConn, ids.getTxnid());
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "heartbeat(" + ids + ")");
+ throw new MetaException("Unable to select from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ heartbeat(ids);
+ }
+ }
+
+ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst)
+ throws MetaException {
+ try {
+ Connection dbConn = null;
+ HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse();
+ Set<Long> nosuch = new HashSet<Long>();
+ Set<Long> aborted = new HashSet<Long>();
+ rsp.setNosuch(nosuch);
+ rsp.setAborted(aborted);
+ try {
+ /**
+ * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)}
+ * only has 1 update statement in it and
+ * we only update existing txns, i.e. nothing can add additional txns that this operation
+ * would care about (which would have required SERIALIZABLE)
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) {
+ try {
+ //todo: do all updates in 1 SQL statement and check update count
+ //if update count is less than was requested, go into more expensive checks
+ //for each txn
+ heartbeatTxn(dbConn, txn);
+ } catch (NoSuchTxnException e) {
+ nosuch.add(txn);
+ } catch (TxnAbortedException e) {
+ aborted.add(txn);
+ }
+ }
+ return rsp;
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ return heartbeatTxnRange(rqst);
+ }
+ }
+
+ long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException {
+ // Get the id for the next entry in the queue
+ String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
+ LOG.debug("going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new IllegalStateException("Transaction tables not properly initiated, " +
+ "no record found in next_compaction_queue_id");
+ }
+ long id = rs.getLong(1);
+ s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ return id;
+ }
+ public long compact(CompactionRequest rqst) throws MetaException {
+ // Put a compaction request in the queue.
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ long id = generateCompactionQueueId(stmt);
+
+ StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
+ "cq_table, ");
+ String partName = rqst.getPartitionname();
+ if (partName != null) buf.append("cq_partition, ");
+ buf.append("cq_state, cq_type");
+ if (rqst.getRunas() != null) buf.append(", cq_run_as");
+ buf.append(") values (");
+ buf.append(id);
+ buf.append(", '");
+ buf.append(rqst.getDbname());
+ buf.append("', '");
+ buf.append(rqst.getTablename());
+ buf.append("', '");
+ if (partName != null) {
+ buf.append(partName);
+ buf.append("', '");
+ }
+ buf.append(INITIATED_STATE);
+ buf.append("', '");
+ switch (rqst.getType()) {
+ case MAJOR:
+ buf.append(MAJOR_TYPE);
+ break;
+
+ case MINOR:
+ buf.append(MINOR_TYPE);
+ break;
+
+ default:
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ throw new MetaException("Unexpected compaction type " + rqst.getType().toString());
+ }
+ if (rqst.getRunas() != null) {
+ buf.append("', '");
+ buf.append(rqst.getRunas());
+ }
+ buf.append("')");
+ String s = buf.toString();
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return id;
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "compact(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return compact(rqst);
+ }
+ }
+
+ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
+ ShowCompactResponse response = new ShowCompactResponse(new ArrayList<ShowCompactResponseElement>());
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " +
+ "cq_start, -1 cc_end, cq_run_as, cq_hadoop_job_id, cq_id from COMPACTION_QUEUE union all " +
+ "select cc_database, cc_table, cc_partition, cc_state, cc_type, cc_worker_id, " +
+ "cc_start, cc_end, cc_run_as, cc_hadoop_job_id, cc_id from COMPLETED_COMPACTIONS";
+ //what I want is order by cc_end desc, cc_start asc (but derby has a bug https://issues.apache.org/jira/browse/DERBY-6013)
+ //to sort so that currently running jobs are at the end of the list (bottom of screen)
+ //and currently running ones are in sorted by start time
+ //w/o order by likely currently running compactions will be first (LHS of Union)
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ ShowCompactResponseElement e = new ShowCompactResponseElement();
+ e.setDbname(rs.getString(1));
+ e.setTablename(rs.getString(2));
+ e.setPartitionname(rs.getString(3));
+ switch (rs.getString(4).charAt(0)) {
+ case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break;
+ case WORKING_STATE: e.setState(WORKING_RESPONSE); break;
+ case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
+ case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
+ case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+ case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break;
+ default:
+ //do nothing to handle RU/D if we add another status
+ }
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break;
+ case MINOR_TYPE: e.setType(CompactionType.MINOR); break;
+ default:
+ //do nothing to handle RU/D if we add another status
+ }
+ e.setWorkerid(rs.getString(6));
+ e.setStart(rs.getLong(7));
+ long endTime = rs.getLong(8);
+ if(endTime != -1) {
+ e.setEndTime(endTime);
+ }
+ e.setRunAs(rs.getString(9));
+ e.setHadoopJobId(rs.getString(10));
+ long id = rs.getLong(11);//for debugging
+ response.addToCompacts(e);
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "showCompact(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ return response;
+ } catch (RetryException e) {
+ return showCompact(rqst);
+ }
+ }
+
+ private static void shouldNeverHappen(long txnid) {
+ throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid));
+ }
+ private static void shouldNeverHappen(long txnid, long extLockId, long intLockId) {
+ throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
+ + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
+ }
+
+ public void addDynamicPartitions(AddDynamicPartitions rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet lockHandle = null;
+ ResultSet rs = null;
+ try {
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
+ if(lockHandle == null) {
+ //ensures txn is still there and in expected state
+ ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
+ shouldNeverHappen(rqst.getTxnid());
+ }
+ //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to
+ //for multi stmt txns if same table is written more than once per tx
+ // MoveTask knows if it's I/U/D
+ // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions()
+ // which ends up here so we'd need to add a field to AddDynamicPartitions.
+ String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid()
+ + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //do limit 1 on this; currently they will all have the same operations
+ rs = stmt.executeQuery(addLimitClause(1, findOperationType));
+ if(!rs.next()) {
+ throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid()));
+ }
+ OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0));
+
+ //what if a txn writes the same table > 1 time... let's go with this for now, but really
+ //need to not write this in the first place, i.e. make this delete not needed
+ //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
+ String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
+ quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
+ //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which
+ //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually
+ //written to
+ stmt.executeUpdate(deleteSql);
+ for (String partName : rqst.getPartitionnames()) {
+ String s =
+ "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" +
+ rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+ "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")";
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")");
+ throw new MetaException("Unable to insert into from transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(lockHandle, stmt, dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ addDynamicPartitions(rqst);
+ }
+ }
+
+ /**
+ * Clean up corresponding records in metastore tables, specifically:
+ * TXN_COMPONENTS, COMPLETED_TXN_COMPONENTS, COMPACTION_QUEUE, COMPLETED_COMPACTIONS
+ */
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+
+ try {
+ String dbName;
+ String tblName;
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ List<String> queries = new ArrayList<String>();
+ StringBuilder buff = new StringBuilder();
+
+ switch (type) {
+ case DATABASE:
+ dbName = db.getName();
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ break;
+ case TABLE:
+ dbName = table.getDbName();
+ tblName = table.getTableName();
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("' and tc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("' and ctc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("' and cq_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("' and cc_table='");
+ buff.append(tblName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ break;
+ case PARTITION:
+ dbName = table.getDbName();
+ tblName = table.getTableName();
+ List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns
+ List<String> partVals; // partition values
+ String partName;
+
+ while (partitionIterator.hasNext()) {
+ Partition p = partitionIterator.next();
+ partVals = p.getValues();
+ partName = Warehouse.makePartName(partCols, partVals);
+
+ buff.append("delete from TXN_COMPONENTS where tc_database='");
+ buff.append(dbName);
+ buff.append("' and tc_table='");
+ buff.append(tblName);
+ buff.append("' and tc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_TXN_COMPONENTS where ctc_database='");
+ buff.append(dbName);
+ buff.append("' and ctc_table='");
+ buff.append(tblName);
+ buff.append("' and ctc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPACTION_QUEUE where cq_database='");
+ buff.append(dbName);
+ buff.append("' and cq_table='");
+ buff.append(tblName);
+ buff.append("' and cq_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from COMPLETED_COMPACTIONS where cc_database='");
+ buff.append(dbName);
+ buff.append("' and cc_table='");
+ buff.append(tblName);
+ buff.append("' and cc_partition='");
+ buff.append(partName);
+ buff.append("'");
+ queries.add(buff.toString());
+ }
+
+ break;
+ default:
+ throw new MetaException("Invalid object type for cleanup: " + type);
+ }
+
+ for (String query : queries) {
+ LOG.debug("Going to execute update <" + query + ">");
+ stmt.executeUpdate(query);
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanupRecords");
+ if (e.getMessage().contains("does not exist")) {
+ LOG.warn("Cannot perform cleanup since metastore table does not exist");
+ } else {
+ throw new MetaException("Unable to clean up " + StringUtils.stringifyException(e));
+ }
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ cleanupRecords(type, db, table, partitionIterator);
+ }
+ }
+
+ /**
+ * For testing only, do not use.
+ */
+ @VisibleForTesting
+ public int numLocksInLockTable() throws SQLException, MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select count(*) from HIVE_LOCKS";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ rs.next();
+ int rc = rs.getInt(1);
+ // Necessary to clean up the transaction in the db.
+ dbConn.rollback();
+ return rc;
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ }
+
+ /**
+ * For testing only, do not use.
+ */
+ public long setTimeout(long milliseconds) {
+ long previous_timeout = timeout;
+ timeout = milliseconds;
+ return previous_timeout;
+ }
+
+ protected class RetryException extends Exception {
+
+ }
+
+ protected Connection getDbConn(int isolationLevel) throws SQLException {
+ int rc = doRetryOnConnPool ? 10 : 1;
+ Connection dbConn = null;
+ while (true) {
+ try {
+ dbConn = connPool.getConnection();
+ dbConn.setAutoCommit(false);
+ dbConn.setTransactionIsolation(isolationLevel);
+ return dbConn;
+ } catch (SQLException e){
+ closeDbConn(dbConn);
+ if ((--rc) <= 0) throw e;
+ LOG.error("There is a problem with a connection from the pool, retrying(rc=" + rc + "): " +
+ getMessage(e), e);
+ }
+ }
+ }
+
+ static void rollbackDBConn(Connection dbConn) {
+ try {
+ if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.warn("Failed to rollback db connection " + getMessage(e));
+ }
+ }
+ protected static void closeDbConn(Connection dbConn) {
+ try {
+ if (dbConn != null && !dbConn.isClosed()) {
+ dbConn.close();
+ }
+ } catch (SQLException e) {
+ LOG.warn("Failed to close db connection " + getMessage(e));
+ }
+ }
+
+ /**
+ * Close statement instance.
+ * @param stmt statement instance.
+ */
+ protected static void closeStmt(Statement stmt) {
+ try {
+ if (stmt != null && !stmt.isClosed()) stmt.close();
+ } catch (SQLException e) {
+ LOG.warn("Failed to close statement " + getMessage(e));
+ }
+ }
+
+ /**
+ * Close the ResultSet.
+ * @param rs may be {@code null}
+ */
+ static void close(ResultSet rs) {
+ try {
+ if (rs != null && !rs.isClosed()) {
+ rs.close();
+ }
+ }
+ catch(SQLException ex) {
+ LOG.warn("Failed to close statement " + getMessage(ex));
+ }
+ }
+
+ /**
+ * Close all 3 JDBC artifacts in order: {@code rs stmt dbConn}
+ */
+ static void close(ResultSet rs, Statement stmt, Connection dbConn) {
+ close(rs);
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ /**
+ * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do
+ * this, so we have to inspect the error messages and catch the telltale signs for each
+ * different database. This method will throw {@code RetryException}
+ * if the error is retry-able.
+ * @param conn database connection
+ * @param e exception that was thrown.
+ * @param caller name of the method calling this (and other info useful to log)
+ * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried
+ */
+ protected void checkRetryable(Connection conn,
+ SQLException e,
+ String caller) throws RetryException, MetaException {
+
+ // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected()
+ // to test these changes.
+ // MySQL and MSSQL use 40001 as the state code for rollback. Postgres uses 40001 and 40P01.
+ // Oracle seems to return different SQLStates and messages each time,
+ // so I've tried to capture the different error messages (there appear to be fewer different
+ // error messages than SQL states).
+ // Derby and newer MySQL driver use the new SQLTransactionRollbackException
+ boolean sendRetrySignal = false;
+ try {
+ if(dbProduct == null) {
+ throw new IllegalStateException("DB Type not determined yet.");
+ }
+ if (e instanceof SQLTransactionRollbackException ||
+ ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES ||
+ dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) ||
+ (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) ||
+ (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected")
+ || e.getMessage().contains("can't serialize access for this transaction")))) {
+ if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) {
+ long waitInterval = deadlockRetryInterval * deadlockCnt;
+ LOG.warn("Deadlock detected in " + caller + ". Will wait " + waitInterval +
+ "ms try again up to " + (ALLOWED_REPEATED_DEADLOCKS - deadlockCnt + 1) + " times.");
+ // Pause for a just a bit for retrying to avoid immediately jumping back into the deadlock.
+ try {
+ Thread.sleep(waitInterval);
+ } catch (InterruptedException ie) {
+ // NOP
+ }
+ sendRetrySignal = true;
+ } else {
+ LOG.error("Too many repeated deadlocks in " + caller + ", giving up.");
+ }
+ } else if (isRetryable(conf, e)) {
+ //in MSSQL this means Communication Link Failure
+ if (retryNum++ < retryLimit) {
+ LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval +
+ "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e));
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ex) {
+ //
+ }
+ sendRetrySignal = true;
+ } else {
+ LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
+ }
+ }
+ else {
+ //make sure we know we saw an error that we don't recognize
+ LOG.info("Non-retryable error: " + getMessage(e));
+ }
+ }
+ finally {
+ /*if this method ends with anything except a retry signal, the caller should fail the operation
+ and propagate the error up to the its caller (Metastore client); thus must reset retry counters*/
+ if(!sendRetrySignal) {
+ deadlockCnt = 0;
+ retryNum = 0;
+ }
+ }
+ if(sendRetrySignal) {
+ throw new RetryException();
+ }
+ }
+
+ /**
+ * Determine the current time, using the RDBMS as a source of truth
+ * @param conn database connection
+ * @return current time in milliseconds
+ * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined
+ */
+ protected long getDbTime(Connection conn) throws MetaException {
+ Statement stmt = null;
+ try {
+ stmt = conn.createStatement();
+ String s;
+ switch (dbProduct) {
+ case DERBY:
+ s = "values current_timestamp";
+ break;
+
+ case MYSQL:
+ case POSTGRES:
+ case SQLSERVER:
+ s = "select current_timestamp";
+ break;
+
+ case ORACLE:
+ s = "select current_timestamp from dual";
+ break;
+
+ default:
+ String msg = "Unknown database product: " + dbProduct.toString();
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) throw new MetaException("No results from date query");
+ return rs.getTimestamp(1).getTime();
+ } catch (SQLException e) {
+ String msg = "Unable to determine current time: " + e.getMessage();
+ LOG.error(msg);
+ throw new MetaException(msg);
+ } finally {
+ closeStmt(stmt);
+ }
+ }
+
+ /**
+ * Determine the String that should be used to quote identifiers.
+ * @param conn Active connection
+ * @return quotes
+ * @throws SQLException
+ */
+ protected String getIdentifierQuoteString(Connection conn) throws SQLException {
+ if (identifierQuoteString == null) {
+ identifierQuoteString = conn.getMetaData().getIdentifierQuoteString();
+ }
+ return identifierQuoteString;
+ }
+
+ protected enum DatabaseProduct { DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER}
+
+ /**
+ * Determine the database product type
+ * @param conn database connection
+ * @return database product type
+ */
+ private DatabaseProduct determineDatabaseProduct(Connection conn) {
+ if (dbProduct == null) {
+ try {
+ String s = conn.getMetaData().getDatabaseProductName();
+ if (s == null) {
+ String msg = "getDatabaseProductName returns null, can't determine database product";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ } else if (s.equals("Apache Derby")) {
+ dbProduct = DatabaseProduct.DERBY;
+ } else if (s.equals("Microsoft SQL Server")) {
+ dbProduct = DatabaseProduct.SQLSERVER;
+ } else if (s.equals("MySQL")) {
+ dbProduct = DatabaseProduct.MYSQL;
+ } else if (s.equals("Oracle")) {
+ dbProduct = DatabaseProduct.ORACLE;
+ } else if (s.equals("PostgreSQL")) {
+ dbProduct = DatabaseProduct.POSTGRES;
+ } else {
+ String msg = "Unrecognized database product name <" + s + ">";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+
+ } catch (SQLException e) {
+ String msg = "Unable to get database product name: " + e.getMessage();
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+ return dbProduct;
+ }
+
+ private static class LockInfo {
+ private final long extLockId;
+ private final long intLockId;
+ //0 means there is no transaction, i.e. it a select statement which is not part of
+ //explicit transaction or a IUD statement that is not writing to ACID table
+ private final long txnId;
+ private final String db;
+ private final String table;
+ private final String partition;
+ private final LockState state;
+ private final LockType type;
+
+ // Assumes the result set is set to a valid row
+ LockInfo(ResultSet rs) throws SQLException, MetaException {
+ extLockId = rs.getLong("hl_lock_ext_id"); // can't be null
+ intLockId = rs.getLong("hl_lock_int_id"); // can't be null
+ db = rs.getString("hl_db"); // can't be null
+ String t = rs.getString("hl_table");
+ table = (rs.wasNull() ? null : t);
+ String p = rs.getString("hl_partition");
+ partition = (rs.wasNull() ? null : p);
+ switch (rs.getString("hl_lock_state").charAt(0)) {
+ case LOCK_WAITING: state = LockState.WAITING; break;
+ case LOCK_ACQUIRED: state = LockState.ACQUIRED; break;
+ default:
+ throw new MetaException("Unknown lock state " + rs.getString("hl_lock_state").charAt(0));
+ }
+ switch (rs.getString("hl_lock_type").charAt(0)) {
+ case LOCK_EXCLUSIVE: type = LockType.EXCLUSIVE; break;
+ case LOCK_SHARED: type = LockType.SHARED_READ; break;
+ case LOCK_SEMI_SHARED: type = LockType.SHARED_WRITE; break;
+ default:
+ throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0));
+ }
+ txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL
+ }
+ LockInfo(ShowLocksResponseElement e) {
+ extLockId = e.getLockid();
+ intLockId = e.getLockIdInternal();
+ txnId = e.getTxnid();
+ db = e.getDbname();
+ table = e.getTablename();
+ partition = e.getPartname();
+ state = e.getState();
+ type = e.getType();
+ }
+
+ public boolean equals(Object other) {
+ if (!(other instanceof LockInfo)) return false;
+ LockInfo o = (LockInfo)other;
+ // Lock ids are unique across the system.
+ return extLockId == o.extLockId && intLockId == o.intLockId;
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.lockIdToString(extLockId) + " intLockId:" +
+ intLockId + " " + JavaUtils.txnIdToString(txnId)
+ + " db:" + db + " table:" + table + " partition:" +
+ partition + " state:" + (state == null ? "null" : state.toString())
+ + " type:" + (type == null ? "null" : type.toString());
+ }
+ private boolean isDbLock() {
+ return db != null && table == null && partition == null;
+ }
+ private boolean isTableLock() {
+ return db != null && table != null && partition == null;
+ }
+ }
+
+ private static class LockInfoComparator implements Comparator<LockInfo> {
+ private static final LockTypeComparator lockTypeComparator = new LockTypeComparator();
+ public boolean equals(Object other) {
+ return this == other;
+ }
+
+ public int compare(LockInfo info1, LockInfo info2) {
+ // We sort by state (acquired vs waiting) and then by LockType, they by id
+ if (info1.state == LockState.ACQUIRED &&
+ info2.state != LockState .ACQUIRED) {
+ return -1;
+ }
+ if (info1.state != LockState.ACQUIRED &&
+ info2.state == LockState .ACQUIRED) {
+ return 1;
+ }
+
+ int sortByType = lockTypeComparator.compare(info1.type, info2.type);
+ if(sortByType != 0) {
+ return sortByType;
+ }
+ if (info1.extLockId < info2.extLockId) {
+ return -1;
+ } else if (info1.extLockId > info2.extLockId) {
+ return 1;
+ } else {
+ if (info1.intLockId < info2.intLockId) {
+ return -1;
+ } else i
<TRUNCATED>