You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2018/07/19 19:55:14 UTC

[15/51] [partial] hive git commit: HIVE-20188 : Split server-specific code outside of standalone metastore-common (Alexander Kolbasov reviewed by Vihang Karajgaonkar)

http://git-wip-us.apache.org/repos/asf/hive/blob/081fa368/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
deleted file mode 100644
index 9dd3787..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ /dev/null
@@ -1,5051 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-
-import javax.sql.DataSource;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
-import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.common.classification.RetrySemantics;
-import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
-import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
-import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
-import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
-import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
-import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
-import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
-import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.StringableMap;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A handler to answer transaction related calls that come into the metastore
- * server.
- *
- * Note on log messages:  Please include txnid:X and lockid info using
- * {@link JavaUtils#txnIdToString(long)}
- * and {@link JavaUtils#lockIdToString(long)} in all messages.
- * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated,
- * so keeping the format consistent makes grep'ing the logs much easier.
- *
- * Note on HIVE_LOCKS.hl_last_heartbeat.
- * For locks that are part of transaction, we set this 0 (would rather set it to NULL but
- * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding
- * transaction in TXNS.
- *
- * In general there can be multiple metastores where this logic can execute, thus the DB is
- * used to ensure proper mutexing of operations.
- * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is
- * used to properly sequence operations.  Most notably:
- * 1. various sequence IDs are generated with aid of this mutex
- * 2. ensuring that each (Hive) Transaction state is transitioned atomically.  Transaction state
- *  includes its actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
- *  per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
- *  This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
- * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
- *  can be granted, no other (strictly speaking "earlier") lock can change state.
- *
- * The exception to his is Derby which doesn't support proper S4U.  Derby is always running embedded
- * (this is the only supported configuration for Derby)
- * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations.
- *
- * {@link #derbyLock}
-
- * If we ever decide to run remote Derby server, according to
- * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
- * seriazlied, so that would also work though has not been tested.
- *
- * General design note:
- * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
- * still valid and active.  In the code this is usually achieved at the same time the txn record
- * is locked for some operation.
- * 
- * Note on retry logic:
- * Metastore has retry logic in both {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
- * and {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}.  The retry logic there is very
- * generic and is not aware whether the operations are idempotent or not.  (This is separate from
- * retry logic here in TxnHander which can/does retry DB errors intelligently).  The worst case is
- * when an op here issues a successful commit against the RDBMS but the calling stack doesn't
- * receive the ack and retries.  (If an op fails before commit, it's trivially idempotent)
- * Thus the ops here need to be made idempotent as much as possible or
- * the metstore call stack should have logic not to retry.  There are {@link RetrySemantics}
- * annotations to document the behavior.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
-
-  static final protected char INITIATED_STATE = 'i';
-  static final protected char WORKING_STATE = 'w';
-  static final protected char READY_FOR_CLEANING = 'r';
-  static final char FAILED_STATE = 'f';
-  static final char SUCCEEDED_STATE = 's';
-  static final char ATTEMPTED_STATE = 'a';
-
-  // Compactor types
-  static final protected char MAJOR_TYPE = 'a';
-  static final protected char MINOR_TYPE = 'i';
-
-  // Transaction states
-  static final protected char TXN_ABORTED = 'a';
-  static final protected char TXN_OPEN = 'o';
-  //todo: make these like OperationType and remove above char constatns
-  enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
-
-  public enum TxnType {
-    DEFAULT(0), REPL_CREATED(1), READ_ONLY(2);
-
-    private final int value;
-    TxnType(int value) {
-      this.value = value;
-    }
-
-    public int getValue() {
-      return value;
-    }
-  }
-
-  // Lock states
-  static final protected char LOCK_ACQUIRED = 'a';
-  static final protected char LOCK_WAITING = 'w';
-
-  // Lock types
-  static final protected char LOCK_EXCLUSIVE = 'e';
-  static final protected char LOCK_SHARED = 'r';
-  static final protected char LOCK_SEMI_SHARED = 'w';
-
-  static final private int ALLOWED_REPEATED_DEADLOCKS = 10;
-  static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName());
-
-  static private DataSource connPool;
-  private static DataSource connPoolMutex;
-  static private boolean doRetryOnConnPool = false;
-
-  private List<TransactionalMetaStoreEventListener> transactionalListeners;
-  
-  private enum OpertaionType {
-    SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d');
-    private final char sqlConst;
-    OpertaionType(char sqlConst) {
-      this.sqlConst = sqlConst;
-    }
-    public String toString() {
-      return Character.toString(sqlConst);
-    }
-    public static OpertaionType fromString(char sqlConst) {
-      switch (sqlConst) {
-        case 's':
-          return SELECT;
-        case 'i':
-          return INSERT;
-        case 'u':
-          return UPDATE;
-        case 'd':
-          return DELETE;
-        default:
-          throw new IllegalArgumentException(quoteChar(sqlConst));
-      }
-    }
-    public static OpertaionType fromDataOperationType(DataOperationType dop) {
-      switch (dop) {
-        case SELECT:
-          return OpertaionType.SELECT;
-        case INSERT:
-          return OpertaionType.INSERT;
-        case UPDATE:
-          return OpertaionType.UPDATE;
-        case DELETE:
-          return OpertaionType.DELETE;
-        default:
-          throw new IllegalArgumentException("Unexpected value: " + dop);
-      }
-    }
-  }
-
-  // Maximum number of open transactions that's allowed
-  private static volatile int maxOpenTxns = 0;
-  // Whether number of open transactions reaches the threshold
-  private static volatile boolean tooManyOpenTxns = false;
-
-  /**
-   * Number of consecutive deadlocks we have seen
-   */
-  private int deadlockCnt;
-  private long deadlockRetryInterval;
-  protected Configuration conf;
-  private static DatabaseProduct dbProduct;
-  private static SQLGenerator sqlGenerator;
-
-  // (End user) Transaction timeout, in milliseconds.
-  private long timeout;
-
-  private String identifierQuoteString; // quotes to use for quoting tables, where necessary
-  private long retryInterval;
-  private int retryLimit;
-  private int retryNum;
-  // Current number of open txns
-  private AtomicInteger numOpenTxns;
-
-  /**
-   * Derby specific concurrency control
-   */
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-  /**
-   * must be static since even in UT there may be > 1 instance of TxnHandler
-   * (e.g. via Compactor services)
-   */
-  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>();
-  private static final String hostname = JavaUtils.hostname();
-
-  // Private methods should never catch SQLException and then throw MetaException.  The public
-  // methods depend on SQLException coming back so they can detect and handle deadlocks.  Private
-  // methods should only throw MetaException when they explicitly know there's a logic error and
-  // they want to throw past the public methods.
-  //
-  // All public methods that write to the database have to check for deadlocks when a SQLException
-  // comes back and handle it if they see one.  This has to be done with the connection pooling
-  // in mind.  To do this they should call checkRetryable() AFTER rolling back the db transaction,
-  // and then they should catch RetryException and call themselves recursively. See commitTxn for an example.
-
-  public TxnHandler() {
-  }
-
-  /**
-   * This is logically part of c'tor and must be called prior to any other method.
-   * Not physically part of c'tor due to use of reflection
-   */
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-
-    checkQFileTestHack();
-
-    synchronized (TxnHandler.class) {
-      if (connPool == null) {
-        Connection dbConn = null;
-        // Set up the JDBC connection pool
-        try {
-          int maxPoolSize = MetastoreConf.getIntVar(conf, ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-          long getConnectionTimeoutMs = 30000;
-          connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs);
-          /*the mutex pools should ideally be somewhat larger since some operations require 1
-           connection from each pool and we want to avoid taking a connection from primary pool
-           and then blocking because mutex pool is empty.  There is only 1 thread in any HMS trying
-           to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock.  The CheckLock operation gets a
-           connection from connPool first, then connPoolMutex.  All others, go in the opposite
-           order (not very elegant...).  So number of connection requests for connPoolMutex cannot
-           exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
-          connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs);
-          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-          determineDatabaseProduct(dbConn);
-          sqlGenerator = new SQLGenerator(dbProduct, conf);
-        } catch (SQLException e) {
-          String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage();
-          LOG.error(msg);
-          throw new RuntimeException(e);
-        } finally {
-          closeDbConn(dbConn);
-        }
-      }
-    }
-
-    numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS);
-
-    timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
-    buildJumpTable();
-    retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL,
-        TimeUnit.MILLISECONDS);
-    retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS);
-    deadlockRetryInterval = retryInterval / 10;
-    maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
-
-    try {
-      transactionalListeners = MetaStoreUtils.getMetaStoreListeners(
-              TransactionalMetaStoreEventListener.class,
-                      conf, MetastoreConf.getVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS));
-    } catch(MetaException e) {
-      String msg = "Unable to get transaction listeners, " + e.getMessage();
-      LOG.error(msg);
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  @RetrySemantics.ReadOnly
-  public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
-    try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        /**
-         * This method can run at READ_COMMITTED as long as long as
-         * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
-         * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
-         * adding corresponding entries into TXNS.  The reason is that any txnid below HWM
-         * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
-        List<TxnInfo> txnInfos = new ArrayList<>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id, txn_state, txn_user, txn_host, txn_started, txn_last_heartbeat from " +
-            "TXNS where txn_id <= " + hwm;
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
-        while (rs.next()) {
-          char c = rs.getString(2).charAt(0);
-          TxnState state;
-          switch (c) {
-            case TXN_ABORTED:
-              state = TxnState.ABORTED;
-              break;
-
-            case TXN_OPEN:
-              state = TxnState.OPEN;
-              break;
-
-            default:
-              throw new MetaException("Unexpected transaction state " + c +
-                " found in txns table");
-          }
-          TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4));
-          txnInfo.setStartedTime(rs.getLong(5));
-          txnInfo.setLastHeartbeatTime(rs.getLong(6));
-          txnInfos.add(txnInfo);
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        return new GetOpenTxnsInfoResponse(hwm, txnInfos);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getOpenTxnsInfo");
-        throw new MetaException("Unable to select from transaction database: " + getMessage(e)
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return getOpenTxnsInfo();
-    }
-  }
-  @Override
-  @RetrySemantics.ReadOnly
-  public GetOpenTxnsResponse getOpenTxns() throws MetaException {
-    try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        /**
-         * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "select ntxn_next - 1 from NEXT_TXN_ID";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
-        List<Long> openList = new ArrayList<>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id";
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
-        long minOpenTxn = Long.MAX_VALUE;
-        BitSet abortedBits = new BitSet();
-        while (rs.next()) {
-          long txnId = rs.getLong(1);
-          openList.add(txnId);
-          char c = rs.getString(2).charAt(0);
-          if(c == TXN_OPEN) {
-            minOpenTxn = Math.min(minOpenTxn, txnId);
-          } else if (c == TXN_ABORTED) {
-            abortedBits.set(openList.size() - 1);
-          }
-        }
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
-        GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
-        if(minOpenTxn < Long.MAX_VALUE) {
-          otr.setMin_open_txn(minOpenTxn);
-        }
-        return otr;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getOpenTxns");
-        throw new MetaException("Unable to select from transaction database, "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return getOpenTxns();
-    }
-  }
-
-  /**
-   * Retry-by-caller note:
-   * Worst case, it will leave an open txn which will timeout.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
-    if (!tooManyOpenTxns && numOpenTxns.get() >= maxOpenTxns) {
-      tooManyOpenTxns = true;
-    }
-    if (tooManyOpenTxns) {
-      if (numOpenTxns.get() < maxOpenTxns * 0.9) {
-        tooManyOpenTxns = false;
-      } else {
-        LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
-            "reached. Current number of open transactions: " + numOpenTxns);
-        throw new MetaException("Maximum allowed number of open transactions has been reached. " +
-            "See hive.max.open.txns.");
-      }
-    }
-
-    int numTxns = rqst.getNum_txns();
-    if (numTxns <= 0) {
-      throw new MetaException("Invalid input for number of txns: " + numTxns);
-    }
-
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        lockInternal();
-        /**
-         * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
-         * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
-         * Also, advancing the counter must work when multiple metastores are running.
-         * SELECT ... FOR UPDATE is used to prevent
-         * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
-         *
-         * In the current design, there can be several metastore instances running in a given Warehouse.
-         * This makes ideas like reserving a range of IDs to save trips to DB impossible.  For example,
-         * a client may go to MS1 and start a transaction with ID 500 to update a particular row.
-         * Now the same client will start another transaction, except it ends up on MS2 and may get
-         * transaction ID 400 and update the same row.  Now the merge that happens to materialize the snapshot
-         * on read will thing the version of the row from transaction ID 500 is the latest one.
-         *
-         * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations).  This
-         * set could support a write-through cache for added performance.
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        // Make sure the user has not requested an insane amount of txns.
-        int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
-        if (numTxns > maxTxns) numTxns = maxTxns;
-
-        stmt = dbConn.createStatement();
-        List<Long> txnIds = openTxns(dbConn, stmt, rqst);
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new OpenTxnsResponse(txnIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return openTxns(rqst);
-    }
-  }
-
-  private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst)
-          throws SQLException, MetaException {
-    int numTxns = rqst.getNum_txns();
-    ResultSet rs = null;
-    TxnType txnType = TxnType.DEFAULT;
-    try {
-      if (rqst.isSetReplPolicy()) {
-        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
-
-        if (!targetTxnIdList.isEmpty()) {
-          if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
-            LOG.warn("target txn id number " + targetTxnIdList.toString() +
-                    " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
-          }
-          LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" +
-                  rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
-          return targetTxnIdList;
-        }
-        txnType = TxnType.REPL_CREATED;
-      }
-
-      String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new MetaException("Transaction database not properly " +
-                "configured, can't find next transaction id.");
-      }
-      long first = rs.getLong(1);
-      s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
-
-      long now = getDbTime(dbConn);
-      List<Long> txnIds = new ArrayList<>(numTxns);
-
-      List<String> rows = new ArrayList<>();
-      for (long i = first; i < first + numTxns; i++) {
-        txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
-                + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue());
-      }
-      List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);
-      for (String q : queries) {
-        LOG.debug("Going to execute update <" + q + ">");
-        stmt.execute(q);
-      }
-
-      // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
-      s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN);
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new IllegalStateException("Scalar query returned no rows?!?!!");
-      }
-
-      // TXNS table should have atleast one entry because we just inserted the newly opened txns.
-      // So, min(txn_id) would be a non-zero txnid.
-      long minOpenTxnId = rs.getLong(1);
-      assert (minOpenTxnId > 0);
-      rows.clear();
-      for (long txnId = first; txnId < first + numTxns; txnId++) {
-        rows.add(txnId + ", " + minOpenTxnId);
-      }
-
-      // Insert transaction entries into MIN_HISTORY_LEVEL.
-      List<String> inserts = sqlGenerator.createInsertValuesStmt(
-              "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
-      for (String insert : inserts) {
-        LOG.debug("Going to execute insert <" + insert + ">");
-        stmt.execute(insert);
-      }
-      LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
-              + ") with min_open_txn: " + minOpenTxnId);
-
-      if (rqst.isSetReplPolicy()) {
-        List<String> rowsRepl = new ArrayList<>();
-
-        for (int i = 0; i < numTxns; i++) {
-          rowsRepl.add(
-                  quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
-        }
-
-        List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
-                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
-
-        for (String query : queriesRepl) {
-          LOG.info("Going to execute insert <" + query + ">");
-          stmt.execute(query);
-        }
-      }
-
-      if (transactionalListeners != null) {
-        MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
-      }
-      return txnIds;
-    } finally {
-      close(rs);
-    }
-  }
-
-  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Statement stmt)
-          throws SQLException {
-    ResultSet rs = null;
-    try {
-      List<String> inQueries = new ArrayList<>();
-      StringBuilder prefix = new StringBuilder();
-      StringBuilder suffix = new StringBuilder();
-      List<Long> targetTxnIdList = new ArrayList<>();
-      prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
-      suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
-      TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList,
-              "RTM_SRC_TXN_ID", false, false);
-      for (String query : inQueries) {
-        LOG.debug("Going to execute select <" + query + ">");
-        rs = stmt.executeQuery(query);
-        while (rs.next()) {
-          targetTxnIdList.add(rs.getLong(1));
-        }
-      }
-      LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
-      return targetTxnIdList;
-    }  catch (SQLException e) {
-      LOG.warn("failed to get target txn ids " + e.getMessage());
-      throw e;
-    } finally {
-      close(rs);
-    }
-  }
-
-  @Override
-  @RetrySemantics.Idempotent
-  public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
-        if (targetTxnIds.isEmpty()) {
-          LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
-          return -1;
-        }
-        assert (targetTxnIds.size() == 1);
-        return targetTxnIds.get(0);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")");
-        throw new MetaException("Unable to get target transaction id "
-                + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return getTargetTxnId(replPolicy, sourceTxnId);
-    }
-  }
-
-  @Override
-  @RetrySemantics.Idempotent
-  public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
-    long txnid = rqst.getTxnid();
-    long sourceTxnId = -1;
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        if (rqst.isSetReplPolicy()) {
-          sourceTxnId = rqst.getTxnid();
-          List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
-          if (targetTxnIds.isEmpty()) {
-            LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
-                    " and repl policy " + rqst.getReplPolicy());
-            return;
-          }
-          assert targetTxnIds.size() == 1;
-          txnid = targetTxnIds.get(0);
-        }
-
-        if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-          TxnStatus status = findTxnState(txnid,stmt);
-          if(status == TxnStatus.ABORTED) {
-            if (rqst.isSetReplPolicy()) {
-              // in case of replication, idempotent is taken care by getTargetTxnId
-              LOG.warn("Invalid state ABORTED for transactions started using replication replay task");
-              String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                      " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-              LOG.info("Going to execute  <" + s + ">");
-              stmt.executeUpdate(s);
-            }
-            LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
-              ") requested by it is already " + TxnStatus.ABORTED);
-            return;
-          }
-          raiseTxnUnexpectedState(status, txnid);
-        }
-
-        if (rqst.isSetReplPolicy()) {
-          String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-              " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
-        }
-
-        if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator);
-        }
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "abortTxn(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      abortTxn(rqst);
-    }
-  }
-
-  @Override
-  @RetrySemantics.Idempotent
-  public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException {
-    List<Long> txnids = rqst.getTxn_ids();
-    try {
-      Connection dbConn = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        int numAborted = abortTxns(dbConn, txnids, false);
-        if (numAborted != txnids.size()) {
-          LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " +
-              txnids.size() + " transactions. It's possible that the other " +
-              (txnids.size() - numAborted) +
-              " transactions have been aborted or committed, or the transaction ids are invalid.");
-        }
-
-        for (Long txnId : txnids) {
-          if (transactionalListeners != null) {
-            MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                    EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator);
-          }
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "abortTxns(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-            + StringUtils.stringifyException(e));
-      } finally {
-        closeDbConn(dbConn);
-      }
-    } catch (RetryException e) {
-      abortTxns(rqst);
-    }
-  }
-
-  /**
-   * Concurrency/isolation notes:
-   * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
-   * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX table for specific txnid:X
-   * see more notes below.
-   * In order to prevent lost updates, we need to determine if any 2 transactions overlap.  Each txn
-   * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
-   * so that we can compare commit time of txn T with start time of txn S.  This sequence can be thought of
-   * as a logical time counter.  If S.commitTime < T.startTime, T and S do NOT overlap.
-   *
-   * Motivating example:
-   * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
-   * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
-   * that they read appropriately.  In particular, if txns do not overlap, then one follows the other
-   * (assumig they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
-   * this by locking in snapshot after 
-   * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn)
-   * and mutexing openTxn() with commit().  In other words, once a S.commit() starts we must ensure
-   * that txn T which will be considered a later txn, locks in a snapshot that includes the result
-   * of S's commit (assuming no other txns).
-   * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
-   * were running in parallel).  If T and S both locked in the same snapshot (for example commit of
-   * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
-   * 'x' would be updated to the same value by both, i.e. lost update. 
-   */
-  @Override
-  @RetrySemantics.Idempotent("No-op if already committed")
-  public void commitTxn(CommitTxnRequest rqst)
-    throws NoSuchTxnException, TxnAbortedException, MetaException {
-    char isUpdateDelete = 'N';
-    long txnid = rqst.getTxnid();
-    long sourceTxnId = -1;
-
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet lockHandle = null;
-      ResultSet commitIdRs = null, rs;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        if (rqst.isSetReplPolicy()) {
-          sourceTxnId = rqst.getTxnid();
-          List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
-          if (targetTxnIds.isEmpty()) {
-            LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
-                    " and repl policy " + rqst.getReplPolicy());
-            return;
-          }
-          assert targetTxnIds.size() == 1;
-          txnid = targetTxnIds.get(0);
-        }
-
-        /**
-         * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures that no other
-         * operation can change this txn (such acquiring locks). While lock() and commitTxn()
-         * should not normally run concurrently (for same txn) but could due to bugs in the client
-         * which could then corrupt internal transaction manager state.  Also competes with abortTxn().
-         */
-        lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
-        if (lockHandle == null) {
-          //if here, txn was not found (in expected state)
-          TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
-          if(actualTxnStatus == TxnStatus.COMMITTED) {
-            if (rqst.isSetReplPolicy()) {
-              // in case of replication, idempotent is taken care by getTargetTxnId
-              LOG.warn("Invalid state COMMITTED for transactions started using replication replay task");
-            }
-            /**
-             * This makes the operation idempotent
-             * (assume that this is most likely due to retry logic)
-             */
-            LOG.info("Nth commitTxn(" + JavaUtils.txnIdToString(txnid) + ") msg");
-            return;
-          }
-          raiseTxnUnexpectedState(actualTxnStatus, txnid);
-          shouldNeverHappen(txnid);
-          //dbConn is rolled back in finally{}
-        }
-
-        String conflictSQLSuffix = null;
-        if (rqst.isSetReplPolicy()) {
-          rs = null;
-        } else {
-          conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
-                  quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
-          rs = stmt.executeQuery(sqlGenerator.addLimitClause(1,
-                  "tc_operation_type " + conflictSQLSuffix));
-        }
-        if (rs != null && rs.next()) {
-          isUpdateDelete = 'Y';
-          close(rs);
-          //if here it means currently committing txn performed update/delete and we should check WW conflict
-          /**
-           * This S4U will mutex with other commitTxn() and openTxns(). 
-           * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
-           * Note: it's possible to have several txns have the same commit id.  Suppose 3 txns start
-           * at the same time and no new txns start until all 3 commit.
-           * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
-           */
-          commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
-          if (!commitIdRs.next()) {
-            throw new IllegalStateException("No rows found in NEXT_TXN_ID");
-          }
-          long commitId = commitIdRs.getLong(1);
-          Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
-          /**
-           * "select distinct" is used below because
-           * 1. once we get to multi-statement txns, we only care to record that something was updated once
-           * 2. if {@link #addDynamicPartitions(AddDynamicPartitions)} is retried by caller it my create
-           *  duplicate entries in TXN_COMPONENTS
-           * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct
-           * even if it includes all of it's columns
-           */
-          int numCompsWritten = stmt.executeUpdate(
-            "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
-            " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix);
-          /**
-           * see if there are any overlapping txns wrote the same element, i.e. have a conflict
-           * Since entire commit operation is mutexed wrt other start/commit ops,
-           * committed.ws_commit_id <= current.ws_commit_id for all txns
-           * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
-           * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
-           * [17,20] committed and [21,21] committing now - these do not overlap.
-           * [17,18] committed and [18,19] committing now - these overlap  (here 18 started while 17 was still running)
-           */
-          rs = stmt.executeQuery
-            (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
-              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
-              "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
-              "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
-              "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
-              //For partitioned table we always track writes at partition level (never at table)
-              //and for non partitioned - always at table level, thus the same table should never
-              //have entries with partition key and w/o
-              "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
-              "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
-              // with txnid, though any decent DB should infer this
-              " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
-              // part of this commitTxn() op
-              " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
-              //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
-              " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +
-              " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
-          if (rs.next()) {
-            //found a conflict
-            String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
-            StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
-            String partitionName = rs.getString(5);
-            if (partitionName != null) {
-              resource.append('/').append(partitionName);
-            }
-            String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
-              " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
-            close(rs);
-            //remove WRITE_SET info for current txn since it's about to abort
-            dbConn.rollback(undoWriteSetForCurrentTxn);
-            LOG.info(msg);
-            //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
-            if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-              throw new IllegalStateException(msg + " FAILED!");
-            }
-            dbConn.commit();
-            close(null, stmt, dbConn);
-            throw new TxnAbortedException(msg);
-          } else {
-            //no conflicting operations, proceed with the rest of commit sequence
-          }
-        }
-        else {
-          /**
-           * current txn didn't update/delete anything (may have inserted), so just proceed with commit
-           *
-           * We only care about commit id for write txns, so for RO (when supported) txns we don't
-           * have to mutex on NEXT_TXN_ID.
-           * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
-           * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
-           * If RO < W, then there is no reads-from relationship.
-           * In replication flow we don't expect any write write conflict as it should have been handled at source.
-           */
-        }
-
-        String s;
-        if (!rqst.isSetReplPolicy()) {
-          // Move the record from txn_components into completed_txn_components so that the compactor
-          // knows where to look to compact.
-          s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
-                  "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " +
-                  "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid;
-          LOG.debug("Going to execute insert <" + s + ">");
-
-          if ((stmt.executeUpdate(s)) < 1) {
-            //this can be reasonable for an empty txn START/COMMIT or read-only txn
-            //also an IUD with DP that didn't match any rows.
-            LOG.info("Expected to move at least one record from txn_components to " +
-                    "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
-          }
-        } else {
-          if (rqst.isSetWriteEventInfos()) {
-            List<String> rows = new ArrayList<>();
-            for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
-              rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
-                      quoteString(writeEventInfo.getTable()) + "," +
-                      quoteString(writeEventInfo.getPartition()) + "," +
-                      writeEventInfo.getWriteId() + "," +
-                      "'" + isUpdateDelete + "'");
-            }
-            List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
-                    "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", rows);
-            for (String q : queries) {
-              LOG.debug("Going to execute insert  <" + q + "> ");
-              stmt.execute(q);
-            }
-          }
-
-          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Repl going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
-        }
-
-        // cleanup all txn related metadata
-        s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "delete from TXNS where txn_id = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-        LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
-
-        s = "delete from MATERIALIZATION_REBUILD_LOCKS where mrl_txn_id = " + txnid;
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
-
-        if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
-        }
-
-        LOG.debug("Going to commit");
-        close(rs);
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "commitTxn(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(commitIdRs);
-        close(lockHandle, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      commitTxn(rqst);
-    }
-  }
-
-  /**
-   * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
-   * @param rqst info on table/partitions and writeid snapshot to replicate.
-   * @throws MetaException
-   */
-  @Override
-  @RetrySemantics.Idempotent("No-op if already replicated the writeid state")
-  public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException {
-    String dbName = rqst.getDbName().toLowerCase();
-    String tblName = rqst.getTableName().toLowerCase();
-    ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(rqst.getValidWriteIdlist());
-
-    // Get the abortedWriteIds which are already sorted in ascending order.
-    List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList);
-    int numAbortedWrites = abortedWriteIds.size();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        // Check if this txn state is already replicated for this given table. If yes, then it is
-        // idempotent case and just return.
-        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName);
-        LOG.debug("Going to execute query <" + sql + ">");
-
-        rs = stmt.executeQuery(sql);
-        if (rs.next()) {
-          LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: "
-                  + dbName + "." + tblName);
-          rollbackDBConn(dbConn);
-          return;
-        }
-
-        if (numAbortedWrites > 0) {
-          // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
-          List<Long> txnIds = openTxns(dbConn, stmt,
-                  new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName()));
-          assert(numAbortedWrites == txnIds.size());
-
-          // Map each aborted write id with each allocated txn.
-          List<String> rows = new ArrayList<>();
-          int i = 0;
-          for (long txn : txnIds) {
-            long writeId = abortedWriteIds.get(i++);
-            rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
-            LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
-          }
-
-          // Insert entries to TXN_TO_WRITE_ID for aborted write ids
-          List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
-          for (String insert : inserts) {
-            LOG.debug("Going to execute insert <" + insert + ">");
-            stmt.execute(insert);
-          }
-
-          // Abort all the allocated txns so that the mapped write ids are referred as aborted ones.
-          int numAborts = abortTxns(dbConn, txnIds, true);
-          assert(numAborts == numAbortedWrites);
-        }
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
-
-        // There are some txns in the list which has no write id allocated and hence go ahead and do it.
-        // Get the next write id for the given table and update it with new next write id.
-        // It is expected NEXT_WRITE_ID doesn't have entry for this table and hence directly insert it.
-        long nextWriteId = validWriteIdList.getHighWatermark() + 1;
-
-        // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
-        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                + quoteString(dbName) + "," + quoteString(tblName) + ","
-                + Long.toString(nextWriteId) + ")";
-        LOG.debug("Going to execute insert <" + sql + ">");
-        stmt.execute(sql);
-
-        LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-                + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      replTableWriteIdState(rqst);
-    }
-
-    // Schedule Major compaction on all the partitions/table to clean aborted data
-    if (numAbortedWrites > 0) {
-      CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(),
-              CompactionType.MAJOR);
-      if (rqst.isSetPartNames()) {
-        for (String partName : rqst.getPartNames()) {
-          compactRqst.setPartitionname(partName);
-          compact(compactRqst);
-        }
-      } else {
-        compact(compactRqst);
-      }
-    }
-  }
-
-  private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
-    List<Long> abortedWriteIds = new ArrayList<>();
-    for (long writeId : validWriteIdList.getInvalidWriteIds()) {
-      if (validWriteIdList.isWriteIdAborted(writeId)) {
-        abortedWriteIds.add(writeId);
-      }
-    }
-    return abortedWriteIds;
-  }
-
-  @Override
-  @RetrySemantics.ReadOnly
-  public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
-          throws NoSuchTxnException, MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ValidTxnList validTxnList;
-
-      // We should prepare the valid write ids list based on validTxnList of current txn.
-      // If no txn exists in the caller, then they would pass null for validTxnList and so it is
-      // required to get the current state of txns to make validTxnList
-      if (rqst.isSetValidTxnList()) {
-        validTxnList = new ValidReadTxnList(rqst.getValidTxnList());
-      } else {
-        // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn
-        validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0);
-      }
-      try {
-        /**
-         * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
-         */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        // Get the valid write id list for all the tables read by the current txn
-        List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
-        for (String fullTableName : rqst.getFullTableNames()) {
-          tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList));
-        }
-
-        LOG.debug("Going to rollback");
-        dbConn.rollback();
-        GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList);
-        return owr;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "getValidWriteIds");
-        throw new MetaException("Unable to select from transaction database, "
-                + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return getValidWriteIds(rqst);
-    }
-  }
-
-  // Method to get the Valid write ids list for the given table
-  // Input fullTableName is expected to be of format <db_name>.<table_name>
-  private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName,
-                                               ValidTxnList validTxnList) throws SQLException {
-    ResultSet rs = null;
-    String[] names = TxnUtils.getDbTableName(fullTableName);
-    try {
-      // Need to initialize to 0 to make sure if nobody modified this table, then current txn
-      // shouldn't read any data.
-      // If there is a conversion from non-acid to acid table, then by default 0 would be assigned as
-      // writeId for data from non-acid table and so writeIdHwm=0 would ensure those data are readable by any txns.
-      long writeIdHwm = 0;
-      List<Long> invalidWriteIdList = new ArrayList<>();
-      long minOpenWriteId = Long.MAX_VALUE;
-      BitSet abortedBits = new BitSet();
-      long txnHwm = validTxnList.getHighWatermark();
-
-      // Find the writeId high water mark based upon txnId high water mark. If found, then, need to
-      // traverse through all write Ids less than writeId HWM to make exceptions list.
-      // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm))
-      String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1]);
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (rs.next()) {
-        writeIdHwm = rs.getLong(1);
-      }
-
-      // If no writeIds allocated by txns under txnHwm, then find writeHwm from NEXT_WRITE_ID.
-      if (writeIdHwm <= 0) {
-        // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest
-        // allocated write id.
-        s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0])
-                + " and nwi_table = " + quoteString(names[1]);
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (rs.next()) {
-          long maxWriteId = rs.getLong(1);
-          if (maxWriteId > 0) {
-            writeIdHwm = (writeIdHwm > 0) ? Math.min(maxWriteId, writeIdHwm) : maxWriteId;
-          }
-        }
-      }
-
-      // As writeIdHwm is known, query all writeIds under the writeId HWM.
-      // If any writeId under HWM is allocated by txn > txnId HWM or belongs to open/aborted txns,
-      // then will be added to invalid list. The results should be sorted in ascending order based
-      // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up
-      // using binary search.
-      s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1])
-              + " order by t2w_writeid asc";
-
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
-      while (rs.next()) {
-        long txnId = rs.getLong(1);
-        long writeId = rs.getLong(2);
-        if (validTxnList.isTxnValid(txnId)) {
-          // Skip if the transaction under evaluation is already committed.
-          continue;
-        }
-
-        // The current txn is either in open or aborted state.
-        // Mark the write ids state as per the txn state.
-        invalidWriteIdList.add(writeId);
-        if (validTxnList.isTxnAborted(txnId)) {
-          abortedBits.set(invalidWriteIdList.size() - 1);
-        } else {
-          minOpenWriteId = Math.min(minOpenWriteId, writeId);
-        }
-      }
-
-      ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
-      TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer);
-      if (minOpenWriteId < Long.MAX_VALUE) {
-        owi.setMinOpenWriteId(minOpenWriteId);
-      }
-      return owi;
-    } finally {
-      close(rs);
-    }
-  }
-
-  @Override
-  @RetrySemantics.Idempotent
-  public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
-          throws NoSuchTxnException, TxnAbortedException, MetaException {
-    List<Long> txnIds;
-    String dbName = rqst.getDbName().toLowerCase();
-    String tblName = rqst.getTableName().toLowerCase();
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
-      List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
-      List<TxnToWriteId> srcTxnToWriteIds = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        if (rqst.isSetReplPolicy()) {
-          srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList();
-          List<Long> srcTxnIds = new ArrayList<>();
-          assert (rqst.isSetSrcTxnToWriteIdList());
-          assert (!rqst.isSetTxnIds());
-          assert (!srcTxnToWriteIds.isEmpty());
-
-          for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
-            srcTxnIds.add(txnToWriteId.getTxnId());
-          }
-          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
-          if (srcTxnIds.size() != txnIds.size()) {
-            LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
-                    " and repl policy " + rqst.getReplPolicy());
-            throw new RuntimeException("This should never happen for txnIds: " + txnIds);
-          }
-        } else {
-          assert (!rqst.isSetSrcTxnToWriteIdList());
-          assert (rqst.isSetTxnIds());
-          txnIds = rqst.getTxnIds();
-        }
-
-        Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow
-
-        // Check if all the input txns are in open state. Write ID should be allocated only for open transactions.
-        if (!isTxnsInOpenState(txnIds, stmt)) {
-          ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
-          throw new RuntimeException("This should never happen for txnIds: " + txnIds);
-        }
-
-        long writeId;
-        String s;
-        long allocatedTxnsCount = 0;
-        long txnId;
-        List<String> queries = new ArrayList<>();
-        StringBuilder prefix = new StringBuilder();
-        StringBuilder suffix = new StringBuilder();
-
-        // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
-        // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
-        // The write id would have been already allocated in case of multi-statement txns where
-        // first write on a table will allocate write id and rest of the writes should re-use it.
-        prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
-                        + " t2w_database = " + quoteString(dbName)
-                        + " and t2w_table = " + quoteString(tblName) + " and ");
-        suffix.append("");
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-                txnIds, "t2w_txnid", false, false);
-        for (String query : queries) {
-          LOG.debug("Going to execute query <" + query + ">");
-          rs = stmt.executeQuery(query);
-          while (rs.next()) {
-            // If table write ID is already allocated for the given transaction, then just use it
-            txnId = rs.getLong(1);
-            writeId = rs.getLong(2);
-            txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
-            allocatedTxnsCount++;
-            LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
-          }
-        }
-
-        // Batch allocation should always happen atomically. Either write ids for all txns is allocated or none.
-        long numOfWriteIds = txnIds.size();
-        assert ((allocatedTxnsCount == 0) || (numOfWriteIds == allocatedTxnsCount));
-        if (allocatedTxnsCount == numOfWriteIds) {
-          // If all the txns in the list have pre-allocated write ids for the given table, then just return.
-          // This is for idempotent case.
-          return new AllocateTableWriteIdsResponse(txnToWriteIds);
-        }
-
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
-
-        // There are some txns in the list which does not have write id allocated and hence go ahead and do it.
-        // Get the next write id for the given table and update it with new next write id.
-        // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
-        s = sqlGenerator.addForUpdateClause(
-                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName));
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          // First allocation of write id should add the table to the next_write_id meta table
-          // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
-          writeId = 1;
-          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")";
-          LOG.debug("Going to execute insert <" + s + ">");
-          stmt.execute(s);
-        } else {
-          writeId = rs.getLong(1);
-          // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
-          s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
-                  + " where nwi_database = " + quoteString(dbName)
-                  + " and nwi_table = " + quoteString(tblName);
-          LOG.debug("Going to execute update <" + s + ">");
-          stmt.executeUpdate(s);
-        }
-
-        // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
-        // write ids
-        List<String> rows = new ArrayList<>();
-        for (long txn : txnIds) {
-          rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
-          txnToWriteIds.add(new TxnToWriteId(txn, writeId));
-          LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
-          writeId++;
-        }
-
-        if (rqst.isSetReplPolicy()) {
-          int lastIdx = txnToWriteIds.size()-1;
-          if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) ||
-              (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) {
-            LOG.error("Allocated write id range {} is not matching with the input write id range {}.",
-                    txnToWriteIds, srcTxnToWriteIds);
-            throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds);
-          }
-        }
-
-        // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
-        List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
-        for (String insert : inserts) {
-          LOG.debug("Going to execute insert <" + insert + ">");
-          stmt.execute(insert);
-        }
-
-        if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.ALLOC_WRITE_ID,
-                  new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null),
-                  dbConn, sqlGenerator);
-        }
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new AllocateTableWriteIdsResponse(txnToWriteIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-                + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return allocateTableWriteIds(rqst);
-    }
-  }
-  @Override
-  public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst)
-      throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      TxnStore.MutexAPI.LockHandle handle = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
-        //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
-        //for this table.  It also has a unique index in case 'should not' is violated
-
-        // First allocation of write id should add the table to the next_write_id meta table
-        // The initial value for write id should be 1 and hence we add 1 with number of write ids
-        // allocated here
-        String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-            + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," +
-            Long.toString(rqst.getSeeWriteId() + 1) + ")";
-        LOG.debug("Going to execute insert <" + s + ">");
-        stmt.execute(s);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "seedWriteIdOnAcidConversion(" + rqst + ")");
-        throw new MetaException("Unable to update transaction database "
-            + StringUtils.stringifyException(e));
-      } finally {
-        close(null, stmt, dbConn);
-        if(handle != null) {
-          handle.releaseLocks();
-        }
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      seedWriteIdOnAcidConversion(rqst);
-    }
-
-  }
-  @Override
-  @RetrySemantics.Idempotent
-  public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent)
-          throws MetaException {
-    Connection dbConn = null;
-    try {
-      try {
-        //Idempotent case is handled by notify Event
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        if (isDuplicateKeyError(e)) {
-          // in case of key duplicate error, retry as it might be because of race condition
-          if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) {
-            throw new RetryException();
-          }
-          retryNum = 0;
-          throw new MetaException(e.getMessage());
-        }
-        checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")");
-        throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e));
-      } finally{
-        closeDbConn(dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      addWriteNotificationLog(acidWriteEvent);
-    }
-  }
-
-  @Override
-  @RetrySemantics.SafeToRetry
-  public void performWriteSetGC() {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
-      if(!rs.next()) {
-        throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
-      }
-      long highestAllocatedTxnId = rs.getLong(1);
-      close(rs);
-      rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
-      if(!rs.next()) {
-        throw new IllegalStateException("Scalar query returned no rows?!?!!");
-      }
-      long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
-      long lowestOpenTxnId = rs.getLong(1);
-      if(rs.wasNull()) {
-        //if here then there are no Open txns and  highestAllocatedTxnId must be
-        //resolved (i.e. committed or aborted), either way
-        //there are no open txns with id <= highestAllocatedTxnId
-        //the +1 is there because "delete ..." below has < (which is correct for the case when
-        //there is an open txn
-        //Concurrency: even if new txn starts (or starts + commits) it is still true that
-        //there are no currently open txns that overlap with any committed txn with 
-        //commitId <= commitHighWaterMark (as set on next line).  So plain READ_COMMITTED is enough.
-        commitHighWaterMark = highestAllocatedTxnId + 1;
-      }
-      else {
-        commitHighWaterMark = lowestOpenTxnId;
-      }
-      int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
-      LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
-      dbConn.commit();
-    } catch (SQLException ex) {
-      LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
-    }
-    finally {
-      close(rs, stmt, dbConn);
-    }
-  }
-
-  /**
-   * Get invalidation info for the materialization. Currently, the materialization information
-   * only contains information about whether there was update/delete operations on the source
-   * tables used by the materialization since it was created.
-   */
-  @Override
-  @RetrySemantics.ReadOnly
-  public Materialization getMaterializationInvalidationInfo(
-      CreationMetadata creationMetadata, String validTxnListStr) throws MetaException {
-    if (creationMetadata.getTablesUsed().isEmpty()) {
-      // Bail out
-      LOG.warn("Materialization creation metadata does not contain any table");
-      return null;
-    }
-
-    // Parse validTxnList
-    final ValidReadTxnList validTxnList =
-        new ValidReadTxnList(validTxnListStr);
-
-    // Parse validReaderWriteIdList from creation metadata
-    final ValidTxnWriteIdList validReaderWriteIdList =
-        new ValidTxnWriteIdList(creationMetadata.getValidTxnList());
-
-    // We are composing a query that returns a single row if an update happened after
-    // the materialization was created. Otherwise, query returns 0 rows.
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      stmt.setMaxRows(1);
-      StringBuilder query = new StringBuilder();
-      // compose a query that select transactions containing an update...
-      query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND (");
-      int i = 0;
-      for (String fullyQualifiedName : creationMetadata.getTablesUsed()) {
-        // ...for each of the tables that are part of the materialized view,
-        // where the transaction had to be committed after the materialization was created...
-        if (i != 0) {
-          query.append("OR");
-        }
-        String[] names = TxnUtils.getDbTableName(fullyQualifiedName);
-        query.append(" (ctc_database=" + quoteString(names[0]) + " AND ctc_table=" + quoteString(names[1]));
-        ValidWriteIdList tblValidWriteIdList =
-            validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
-        if (tblValidWriteIdList == null) {
-          LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen");
-          return null;
-        }
-        query.append(" AND (ctc_writeid > " + tblValidWriteIdList.getHighWatermark());
-        query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") " :
-            " OR ctc_writeid IN(" + StringUtils.join(",",
-                Arrays.asList(ArrayUtils.toObject(tblValidWriteIdList.getInvalidWriteIds()))) + ") ");
-        query.append(") ");
-        i++;
-      }
-      // ... and where the transaction has already been committed as per snapshot taken
-      // when we are running current query
-      query.append(") AND ctc_txnid <= " + validTxnList.getHighWatermark());
-      query.append(validTxnList.getInvalidTransactions().length == 0 ? " " :
-          " AND ctc_txnid NOT IN(" + StringUtils.join(",",
-              Arrays.asList(ArrayUtils.toObject(validTxnList.getInvalidTransactions()))) + ") ");
-
-      // Execute query
-      String s = query.toString();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Going to execute query <" + s + ">");
-      }
-      rs = stmt.executeQuery(s);
-
-      return new Materialization(rs.next());
-    } catch (SQLException ex) {
-      LOG.warn("getMaterializationInvalidationInfo failed due to " + getMessage(ex), ex);
-      throw new MetaException("Unable to retrieve materialization invalidation information due to " +
-          StringUtils.stringifyException(ex));
-    } finally {
-      close(rs, stmt, dbConn);
-    }
-  }
-
-  @Override
-  public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId)
-      throws MetaException {
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Acquiring lock for materialization rebuild with txnId={} for {}", txnId, Warehouse.getQualifiedName(dbName,tableName));
-    }
-
-    TxnStore.MutexAPI.LockHandle handle = null;
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      lockInternal();
-      /**
-       * MUTEX_KEY.MaterializationRebuild lock ensures that there is only 1 entry in
-       * Initiated/Working state for any resource. This ensures we do not run concurrent
-       * rebuild operations on any materialization.
-       */
-      handle = getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name());
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-
-      String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" +
-          " mrl_db_name =" + quoteString(dbName) +
-          " AND mrl_tbl_name=" + quoteString(tableName);
-      LOG.debug("Going to execute query <" + selectQ + ">");
-      rs = stmt.executeQuery(selectQ);
-      if(rs.next()) {
-        LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName +
-            " since it is already being rebuilt");
-        return new LockResponse(txnId, LockState.NOT_ACQUIRED);
-      }
-      String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " +
-          "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId +
-          ", '" + dbName + "', '" + tableName + "', " + Instant.now().toEpochMilli() + ")";
-      LOG.debug("Going to execute update <" + insertQ + ">");
-      stmt.executeUpdate(insertQ);
-      LOG.debug("Going to commit");
-      dbConn.commit();
-      return new LockResponse(txnId, LockState.ACQUIRED);
-    } catch (SQLException ex) {
-      LOG.warn("lockMaterializationRebuild failed due to " + getMessage(ex), ex);
-      throw new MetaException("Unable to retrieve materialization invalidation information due to " +
-          StringUtils.stringifyException(ex));
-    } finally {
-      close(rs, stmt, dbConn);
-      if(handle != null) {
-        handle.releaseLocks();
-      }
-      unlockInternal();
-    }
-  }
-
-  @Override
-  public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId)
-      throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "update MATERIALIZATION_REBUILD_LOCKS" +
-            " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() +
-            " where mrl_txn_id = " + txnId +
-            " AND mrl_db_name =" + quoteString(dbName) +
-            " AND mrl_tbl_name=" + quoteString(tableName);
-        LOG.debug("Going to execute update <" + s + ">");
-        int rc = stmt.executeUpdate(s);
-        if (rc < 1) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          LOG.info("No lock found for rebuild of " + Warehouse.getQualifiedName(dbName, tableName) +
-              " when trying to heartbeat");
-          // It could not be renewed, return that information
-          return false;
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        // It could be renewed, return that information
-        return true;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e,
-            "heartbeatLockMaterializationRebuild(" + Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")");
-        throw new MetaException("Unable to heartbeat rebuild lock due to " +
-            StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return heartbeatLockMaterializationRebuild(dbName, tableName ,txnId);
-    }
-  }
-
-  @Override
-  public long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long timeout) throws MetaException {
-    try {
-      // Aux values
-      long cnt = 0L;
-      List<Long> txnIds = new ArrayList<>();
-      long timeoutTime = Instant.now().toEpochMilli() - timeout;
-
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-      try {
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
-        String selectQ = "select mrl_txn_id, mrl_last_heartbeat from MATERIALIZATION_REBUILD_LOCKS";
-        LOG.debug("Going to execute query <" + selectQ + ">");
-        rs = stmt.executeQuery(selectQ);
-        while(rs.next()) {
-          long lastHeartbeat = rs.getLong(2);
-          if (lastHeartbeat < timeoutTime) {
-            // The heartbeat has timeout, double check whether we can remove it
-            long txnId = rs.getLong(1);
-            if (validTxnList.isTxnValid(txnId) || validTxnList.isTxnAborted(txnId)) {
-              // Txn was committed (but notification was not received) or it was aborted.
-              // Either case, we can clean it up
-              txnIds.add(txnId);
-            }
-          }
-        }
-        if (!txnIds.isEmpty()) {
-          String deleteQ = "delete from MATERIALIZATION_REBUILD_LOCKS where" +
-              " mrl_txn_id IN(" + StringUtils.join(",", txnIds) + ") ";
-          LOG.debug("Going to execute update <" + deleteQ + ">");
-          cnt = stmt.executeUpdate(deleteQ);
-        }
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return cnt;
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "cleanupMaterializationRebuildLocks");
-        throw new MetaException("Unable to clean rebuild locks due to " +
-            StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        unlockInternal();
-      }
-    } catch (RetryException e) {
-      return cleanupMaterializationRebuildLocks(validTxnList, timeout);
-    }
-  }
-
-  /**
-   * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
-   * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
-   * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
-   * doesn't see results of the first.
-   * 
-   * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
-   * there will be a duplicate set of locks but both sets will belong to the same txn so they 
-   * will not conflict with each other.  For locks w/o txn context (i.e. read-only query), this
-   * may lead to deadlock (at least a long wait).  (e.g. 1st call creates locks in {@code LOCK_WAITING}
-   * mode and response gets lost.  Then {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient}
-   * retries, and enqueues another set of locks in LOCK_WAITING.  The 2nd LockResponse is delivered
-   * to the DbLockManager, which will keep dong {@link #checkLock(CheckLockRequest)} until the 1st
-   * set of locks times out.
-   */
-  @RetrySemantics.CannotRetry
-  public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
-    ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
-    try {
-      return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid());
-    }
-    catch(NoSuchLockException e) {
-      // This should never happen, as we just added the lock id
-      throw new MetaException("Couldn't find a lock we just 

<TRUNCATED>