You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/10/10 00:52:02 UTC

[28/61] [abbrv] hive git commit: HIVE-17561 Move TxnStore and implementations to standalone metastore (Alan Gates, reviewed by Eugene Koifman)

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
new file mode 100644
index 0000000..9fc3697
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.datasource;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * DataSourceProvider for the HikariCP connection pool.
+ */
+public class HikariCPDataSourceProvider implements DataSourceProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HikariCPDataSourceProvider.class);
+
+  public static final String HIKARI = "hikari";
+  private static final String CONNECTION_TIMEOUT_PROPERTY= "hikari.connectionTimeout";
+
+  @Override
+  public DataSource create(Configuration hdpConfig) throws SQLException {
+
+    LOG.debug("Creating Hikari connection pool for the MetaStore");
+
+    String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
+    String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
+    String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
+    int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
+        MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+
+    Properties properties = replacePrefix(
+        DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI));
+    long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 30000L);
+    HikariConfig config = null;
+    try {
+      config = new HikariConfig(properties);
+    } catch (Exception e) {
+      throw new SQLException("Cannot create HikariCP configuration: ", e);
+    }
+    config.setMaximumPoolSize(maxPoolSize);
+    config.setJdbcUrl(driverUrl);
+    config.setUsername(user);
+    config.setPassword(passwd);
+    //https://github.com/brettwooldridge/HikariCP
+    config.setConnectionTimeout(connectionTimeout);
+    return new HikariDataSource(config);
+  }
+
+  @Override
+  public boolean mayReturnClosedConnection() {
+    // Only BoneCP should return true
+    return false;
+  }
+
+  @Override
+  public boolean supports(Configuration configuration) {
+    String poolingType =
+        MetastoreConf.getVar(configuration,
+            MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
+    if (HIKARI.equals(poolingType)) {
+      int hikariPropsNr = DataSourceProvider.getPrefixedProperties(configuration, HIKARI).size();
+      LOG.debug("Found " + hikariPropsNr + " nr. of hikari specific configurations");
+      return hikariPropsNr > 0;
+    }
+    LOG.debug("Configuration requested " + poolingType + " pooling, HikariCpDSProvider exiting");
+    return false;
+  }
+
+  private Properties replacePrefix(Properties props) {
+    Properties newProps = new Properties();
+    props.forEach((key,value) ->
+        newProps.put(key.toString().replaceFirst(HIKARI + ".", ""), value));
+    return newProps;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/package-info.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/package-info.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/package-info.java
new file mode 100644
index 0000000..9a4f22a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/datasource/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * DataSource providers that can construct a connection pool from configuration
+ * properties in a hadoop configuration object.
+ */
+package org.apache.hadoop.hive.metastore.datasource;

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
new file mode 100644
index 0000000..8268af9
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -0,0 +1,172 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.tools;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Helper class that generates SQL queries with syntax specific to target DB
+ * todo: why throw MetaException?
+ */
+@VisibleForTesting
+public final class SQLGenerator {
+  static final private Logger LOG = LoggerFactory.getLogger(SQLGenerator.class.getName());
+  private final DatabaseProduct dbProduct;
+  private final Configuration conf;
+
+  public SQLGenerator(DatabaseProduct dbProduct, Configuration conf) {
+    this.dbProduct = dbProduct;
+    this.conf = conf;
+  }
+
+  /**
+   * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   *
+   * @param tblColumns e.g. "T(a,b,c)"
+   * @param rows       e.g. list of Strings like 3,4,'d'
+   * @return fully formed INSERT INTO ... statements
+   */
+  public List<String> createInsertValuesStmt(String tblColumns, List<String> rows) {
+    if (rows == null || rows.size() == 0) {
+      return Collections.emptyList();
+    }
+    List<String> insertStmts = new ArrayList<>();
+    StringBuilder sb = new StringBuilder();
+    switch (dbProduct) {
+    case ORACLE:
+      if (rows.size() > 1) {
+        //http://www.oratable.com/oracle-insert-all/
+        //https://livesql.oracle.com/apex/livesql/file/content_BM1LJQ87M5CNIOKPOWPV6ZGR3.html
+        for (int numRows = 0; numRows < rows.size(); numRows++) {
+          if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
+            if (numRows > 0) {
+              sb.append(" select * from dual");
+              insertStmts.add(sb.toString());
+            }
+            sb.setLength(0);
+            sb.append("insert all ");
+          }
+          sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows))
+              .append(") ");
+        }
+        sb.append("select * from dual");
+        insertStmts.add(sb.toString());
+        return insertStmts;
+      }
+      //fall through
+    case DERBY:
+    case MYSQL:
+    case POSTGRES:
+    case SQLSERVER:
+      for (int numRows = 0; numRows < rows.size(); numRows++) {
+        if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
+          if (numRows > 0) {
+            insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+          }
+          sb.setLength(0);
+          sb.append("insert into ").append(tblColumns).append(" values");
+        }
+        sb.append('(').append(rows.get(numRows)).append("),");
+      }
+      insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+      return insertStmts;
+    default:
+      String msg = "Unrecognized database product name <" + dbProduct + ">";
+      LOG.error(msg);
+      throw new IllegalStateException(msg);
+    }
+  }
+
+  /**
+   * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent
+   * construct.  If the DB doesn't support, return original select.
+   */
+  public String addForUpdateClause(String selectStatement) throws MetaException {
+    switch (dbProduct) {
+    case DERBY:
+      //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
+      //sadly in Derby, FOR UPDATE doesn't meant what it should
+      return selectStatement;
+    case MYSQL:
+      //http://dev.mysql.com/doc/refman/5.7/en/select.html
+    case ORACLE:
+      //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
+    case POSTGRES:
+      //http://www.postgresql.org/docs/9.0/static/sql-select.html
+      return selectStatement + " for update";
+    case SQLSERVER:
+      //https://msdn.microsoft.com/en-us/library/ms189499.aspx
+      //https://msdn.microsoft.com/en-us/library/ms187373.aspx
+      String modifier = " with (updlock)";
+      int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
+      if (wherePos < 0) {
+        return selectStatement + modifier;
+      }
+      return selectStatement.substring(0, wherePos) + modifier +
+          selectStatement.substring(wherePos, selectStatement.length());
+    default:
+      String msg = "Unrecognized database product name <" + dbProduct + ">";
+      LOG.error(msg);
+      throw new MetaException(msg);
+    }
+  }
+
+  /**
+   * Suppose you have a query "select a,b from T" and you want to limit the result set
+   * to the first 5 rows.  The mechanism to do that differs in different DBs.
+   * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the
+   * appropriately modified row limiting query.
+   * <p>
+   * Note that if {@code noSelectsqlQuery} contains a join, you must make sure that
+   * all columns are unique for Oracle.
+   */
+  public String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException {
+    switch (dbProduct) {
+    case DERBY:
+      //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html
+      return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only";
+    case MYSQL:
+      //http://www.postgresql.org/docs/7.3/static/queries-limit.html
+    case POSTGRES:
+      //https://dev.mysql.com/doc/refman/5.0/en/select.html
+      return "select " + noSelectsqlQuery + " limit " + numRows;
+    case ORACLE:
+      //newer versions (12c and later) support OFFSET/FETCH
+      return "select * from (select " + noSelectsqlQuery + ") where rownum <= " + numRows;
+    case SQLSERVER:
+      //newer versions (2012 and later) support OFFSET/FETCH
+      //https://msdn.microsoft.com/en-us/library/ms189463.aspx
+      return "select TOP(" + numRows + ") " + noSelectsqlQuery;
+    default:
+      String msg = "Unrecognized database product name <" + dbProduct + ">";
+      LOG.error(msg);
+      throw new MetaException(msg);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
new file mode 100644
index 0000000..1223b52
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.RunnableConfigurable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background running thread, periodically updating number of open transactions.
+ * Runs inside Hive Metastore Service.
+ */
+public class AcidOpenTxnsCounterService implements RunnableConfigurable {
+  private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class);
+
+  private Configuration conf;
+  private int isAliveCounter = 0;
+  private long lastLogTime = 0;
+
+  @Override
+  public void run() {
+    try {
+      long startTime = System.currentTimeMillis();
+      isAliveCounter++;
+      TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+      txnHandler.countOpenTxns();
+      if (System.currentTimeMillis() - lastLogTime > 60 * 1000) {
+        LOG.info("AcidOpenTxnsCounterService ran for " +
+            ((System.currentTimeMillis() - startTime) / 1000) +
+            " seconds.  isAliveCounter = " + isAliveCounter);
+        lastLogTime = System.currentTimeMillis();
+      }
+    }
+    catch(Throwable t) {
+      LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
new file mode 100644
index 0000000..41e428b
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidCompactorTxnList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Information on a possible or running compaction.
+ */
+public class CompactionInfo implements Comparable<CompactionInfo> {
+  public long id;
+  public String dbname;
+  public String tableName;
+  public String partName;
+  char state;
+  public CompactionType type;
+  String workerId;
+  long start;
+  public String runAs;
+  public String properties;
+  public boolean tooManyAborts = false;
+  /**
+   * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) 
+   * See {@link TxnStore#setCompactionHighestTxnId(CompactionInfo, long)} for precise definition.
+   * See also {@link TxnUtils#createValidCompactTxnList(GetOpenTxnsInfoResponse)} and
+   * {@link ValidCompactorTxnList#highWatermark}
+   */
+  public long highestTxnId;
+  byte[] metaInfo;
+  String hadoopJobId;
+
+  private String fullPartitionName = null;
+  private String fullTableName = null;
+
+  public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) {
+    this.dbname = dbname;
+    this.tableName = tableName;
+    this.partName = partName;
+    this.type = type;
+  }
+  CompactionInfo(long id, String dbname, String tableName, String partName, char state) {
+    this(dbname, tableName, partName, null);
+    this.id = id;
+    this.state = state;
+  }
+  CompactionInfo() {}
+  
+  public String getFullPartitionName() {
+    if (fullPartitionName == null) {
+      StringBuilder buf = new StringBuilder(dbname);
+      buf.append('.');
+      buf.append(tableName);
+      if (partName != null) {
+        buf.append('.');
+        buf.append(partName);
+      }
+      fullPartitionName = buf.toString();
+    }
+    return fullPartitionName;
+  }
+
+  public String getFullTableName() {
+    if (fullTableName == null) {
+      StringBuilder buf = new StringBuilder(dbname);
+      buf.append('.');
+      buf.append(tableName);
+      fullTableName = buf.toString();
+    }
+    return fullTableName;
+  }
+  public boolean isMajorCompaction() {
+    return CompactionType.MAJOR == type;
+  }
+
+  @Override
+  public int compareTo(CompactionInfo o) {
+    return getFullPartitionName().compareTo(o.getFullPartitionName());
+  }
+  public String toString() {
+    return "id:" + id + "," +
+      "dbname:" + dbname + "," +
+      "tableName:" + tableName + "," +
+      "partName:" + partName + "," +
+      "state:" + state + "," +
+      "type:" + type + "," +
+      "properties:" + properties + "," +
+      "runAs:" + runAs + "," +
+      "tooManyAborts:" + tooManyAborts + "," +
+      "highestTxnId:" + highestTxnId;
+  }
+
+  /**
+   * loads object from a row in Select * from COMPACTION_QUEUE
+   * @param rs ResultSet after call to rs.next()
+   * @throws SQLException
+   */
+  static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLException {
+    CompactionInfo fullCi = new CompactionInfo();
+    fullCi.id = rs.getLong(1);
+    fullCi.dbname = rs.getString(2);
+    fullCi.tableName = rs.getString(3);
+    fullCi.partName = rs.getString(4);
+    fullCi.state = rs.getString(5).charAt(0);//cq_state
+    fullCi.type = TxnHandler.dbCompactionType2ThriftType(rs.getString(6).charAt(0));
+    fullCi.properties = rs.getString(7);
+    fullCi.workerId = rs.getString(8);
+    fullCi.start = rs.getLong(9);
+    fullCi.runAs = rs.getString(10);
+    fullCi.highestTxnId = rs.getLong(11);
+    fullCi.metaInfo = rs.getBytes(12);
+    fullCi.hadoopJobId = rs.getString(13);
+    return fullCi;
+  }
+  static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException {
+    pStmt.setLong(1, ci.id);
+    pStmt.setString(2, ci.dbname);
+    pStmt.setString(3, ci.tableName);
+    pStmt.setString(4, ci.partName);
+    pStmt.setString(5, Character.toString(ci.state));
+    pStmt.setString(6, Character.toString(TxnHandler.thriftCompactionType2DbType(ci.type)));
+    pStmt.setString(7, ci.properties);
+    pStmt.setString(8, ci.workerId);
+    pStmt.setLong(9, ci.start);
+    pStmt.setLong(10, endTime);
+    pStmt.setString(11, ci.runAs);
+    pStmt.setLong(12, ci.highestTxnId);
+    pStmt.setBytes(13, ci.metaInfo);
+    pStmt.setString(14, ci.hadoopJobId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
new file mode 100644
index 0000000..e676b91
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -0,0 +1,961 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.classification.RetrySemantics;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Extends the transaction handler with methods needed only by the compactor threads.  These
+ * methods are not available through the thrift interface.
+ */
+class CompactionTxnHandler extends TxnHandler {
+  static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
+  static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+  public CompactionTxnHandler() {
+  }
+
+  /**
+   * This will look through the completed_txn_components table and look for partitions or tables
+   * that may be ready for compaction.  Also, look through txns and txn_components tables for
+   * aborted transactions that we should add to the list.
+   * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+   *                   potential compaction.
+   * @return list of CompactionInfo structs.  These will not have id, type,
+   * or runAs set since these are only potential compactions not actual ones.
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+    Connection dbConn = null;
+    Set<CompactionInfo> response = new HashSet<>();
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        // Check for completed transactions
+        String s = "select distinct ctc_database, ctc_table, " +
+          "ctc_partition from COMPLETED_TXN_COMPONENTS";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.dbname = rs.getString(1);
+          info.tableName = rs.getString(2);
+          info.partName = rs.getString(3);
+          response.add(info);
+        }
+        rs.close();
+
+        // Check for aborted txns
+        s = "select tc_database, tc_table, tc_partition " +
+          "from TXNS, TXN_COMPONENTS " +
+          "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
+          "group by tc_database, tc_table, tc_partition " +
+          "having count(*) > " + maxAborted;
+
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.dbname = rs.getString(1);
+          info.tableName = rs.getString(2);
+          info.partName = rs.getString(3);
+          info.tooManyAborts = true;
+          response.add(info);
+        }
+
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database " + e.getMessage());
+        checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+      return response;
+    }
+    catch (RetryException e) {
+      return findPotentialCompactions(maxAborted);
+    }
+  }
+
+  /**
+   * Sets the user to run as.  This is for the case
+   * where the request was generated by the user and so the worker must set this value later.
+   * @param cq_id id of this entry in the queue
+   * @param user user to run the jobs as
+   */
+  @Override
+  @RetrySemantics.Idempotent
+  public void setRunAs(long cq_id, String user) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
+        LOG.debug("Going to execute update <" + s + ">");
+        int updCnt = stmt.executeUpdate(s);
+        if (updCnt != 1) {
+          LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ".  updCnt=" + updCnt);
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to update compaction queue, " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")");
+      } finally {
+        closeDbConn(dbConn);
+        closeStmt(stmt);
+      }
+    } catch (RetryException e) {
+      setRunAs(cq_id, user);
+    }
+  }
+
+  /**
+   * This will grab the next compaction request off of
+   * the queue, and assign it to the worker.
+   * @param workerId id of the worker calling this, will be recorded in the db
+   * @return an info element for this compaction request, or null if there is no work to do now.
+   */
+  @Override
+  @RetrySemantics.SafeToRetry
+  public CompactionInfo findNextToCompact(String workerId) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+      Statement updStmt = null;
+      ResultSet rs = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+          "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        if (!rs.next()) {
+          LOG.debug("No compactions found ready to compact");
+          dbConn.rollback();
+          return null;
+        }
+        updStmt = dbConn.createStatement();
+        do {
+          CompactionInfo info = new CompactionInfo();
+          info.id = rs.getLong(1);
+          info.dbname = rs.getString(2);
+          info.tableName = rs.getString(3);
+          info.partName = rs.getString(4);
+          info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+          info.properties = rs.getString(6);
+          // Now, update this record as being worked on by this worker.
+          long now = getDbTime(dbConn);
+          s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+            "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+            " AND cq_state='" + INITIATED_STATE + "'";
+          LOG.debug("Going to execute update <" + s + ">");
+          int updCount = updStmt.executeUpdate(s);
+          if(updCount == 1) {
+            dbConn.commit();
+            return info;
+          }
+          if(updCount == 0) {
+            LOG.debug("Another Worker picked up " + info);
+            continue;
+          }
+          LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
+            info + ". updCnt=" + updCount + ".");
+          dbConn.rollback();
+          return null;
+        } while( rs.next());
+        dbConn.rollback();
+        return null;
+      } catch (SQLException e) {
+        LOG.error("Unable to select next element for compaction, " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(updStmt);
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return findNextToCompact(workerId);
+    }
+  }
+
+  /**
+   * This will mark an entry in the queue as compacted
+   * and put it in the ready to clean state.
+   * @param info info on the compaction entry to mark as compacted.
+   */
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void markCompacted(CompactionInfo info) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
+          "cq_worker_id = null where cq_id = " + info.id;
+        LOG.debug("Going to execute update <" + s + ">");
+        int updCnt = stmt.executeUpdate(s);
+        if (updCnt != 1) {
+          LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt);
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+        }
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to update compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "markCompacted(" + info + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      markCompacted(info);
+    }
+  }
+
+  /**
+   * Find entries in the queue that are ready to
+   * be cleaned.
+   * @return information on the entry in the queue.
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public List<CompactionInfo> findReadyToClean() throws MetaException {
+    Connection dbConn = null;
+    List<CompactionInfo> rc = new ArrayList<>();
+
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+          "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        while (rs.next()) {
+          CompactionInfo info = new CompactionInfo();
+          info.id = rs.getLong(1);
+          info.dbname = rs.getString(2);
+          info.tableName = rs.getString(3);
+          info.partName = rs.getString(4);
+          switch (rs.getString(5).charAt(0)) {
+            case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+            case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+            default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+          }
+          info.runAs = rs.getString(6);
+          info.highestTxnId = rs.getLong(7);
+          rc.add(info);
+        }
+        LOG.debug("Going to rollback");
+        dbConn.rollback();
+        return rc;
+      } catch (SQLException e) {
+        LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findReadyToClean");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return findReadyToClean();
+    }
+  }
+
+  /**
+   * This will remove an entry from the queue after
+   * it has been compacted.
+   * 
+   * @param info info on the compaction entry to remove
+   */
+  @Override
+  @RetrySemantics.CannotRetry
+  public void markCleaned(CompactionInfo info) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pStmt = null;
+      ResultSet rs = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+        if(rs.next()) {
+          info = CompactionInfo.loadFullFromCompactionQueue(rs);
+        }
+        else {
+          throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+        }
+        close(rs);
+        String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
+        LOG.debug("Going to execute update <" + s + ">");
+        int updCount = stmt.executeUpdate(s);
+        if (updCount != 1) {
+          LOG.error("Unable to delete compaction record: " + info +  ".  Update count=" + updCount);
+          LOG.debug("Going to rollback");
+          dbConn.rollback();
+        }
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+        info.state = SUCCEEDED_STATE;
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
+        updCount = pStmt.executeUpdate();
+
+        // Remove entries from completed_txn_components as well, so we don't start looking there
+        // again but only up to the highest txn ID include in this compaction job.
+        //highestTxnId will be NULL in upgrade scenarios
+        s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " +
+          "ctc_table = '" + info.tableName + "'";
+        if (info.partName != null) {
+          s += " and ctc_partition = '" + info.partName + "'";
+        }
+        if(info.highestTxnId != 0) {
+          s += " and ctc_txnid <= " + info.highestTxnId;
+        }
+        LOG.debug("Going to execute update <" + s + ">");
+        if (stmt.executeUpdate(s) < 1) {
+          LOG.error("Expected to remove at least one row from completed_txn_components when " +
+            "marking compaction entry as clean!");
+        }
+
+        s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
+          TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
+          info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
+        if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
+        LOG.debug("Going to execute update <" + s + ">");
+        rs = stmt.executeQuery(s);
+        List<Long> txnids = new ArrayList<>();
+        while (rs.next()) txnids.add(rs.getLong(1));
+        // Remove entries from txn_components, as there may be aborted txn components
+        if (txnids.size() > 0) {
+          List<String> queries = new ArrayList<>();
+
+          // Prepare prefix and suffix
+          StringBuilder prefix = new StringBuilder();
+          StringBuilder suffix = new StringBuilder();
+
+          prefix.append("delete from TXN_COMPONENTS where ");
+
+          //because 1 txn may include different partitions/tables even in auto commit mode
+          suffix.append(" and tc_database = ");
+          suffix.append(quoteString(info.dbname));
+          suffix.append(" and tc_table = ");
+          suffix.append(quoteString(info.tableName));
+          if (info.partName != null) {
+            suffix.append(" and tc_partition = ");
+            suffix.append(quoteString(info.partName));
+          }
+
+          // Populate the complete query with provided prefix and suffix
+          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+
+          for (String query : queries) {
+            LOG.debug("Going to execute update <" + query + ">");
+            int rc = stmt.executeUpdate(query);
+            LOG.debug("Removed " + rc + " records from txn_components");
+
+            // Don't bother cleaning from the txns table.  A separate call will do that.  We don't
+            // know here which txns still have components from other tables or partitions in the
+            // table, so we don't know which ones we can and cannot clean.
+          }
+        }
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "markCleaned(" + info + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(pStmt);
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      markCleaned(info);
+    }
+  }
+
+  /**
+   * Clean up aborted transactions from txns that have no components in txn_components.  The reason such
+   * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+   */
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void cleanEmptyAbortedTxns() throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      ResultSet rs = null;
+      try {
+        //Aborted is a terminal state, so nothing about the txn can change
+        //after that, so READ COMMITTED is sufficient.
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "select txn_id from TXNS where " +
+          "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
+          "txn_state = '" + TXN_ABORTED + "'";
+        LOG.debug("Going to execute query <" + s + ">");
+        rs = stmt.executeQuery(s);
+        List<Long> txnids = new ArrayList<>();
+        while (rs.next()) txnids.add(rs.getLong(1));
+        close(rs);
+        if(txnids.size() <= 0) {
+          return;
+        }
+        Collections.sort(txnids);//easier to read logs
+        List<String> queries = new ArrayList<>();
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+
+        prefix.append("delete from TXNS where ");
+        suffix.append("");
+
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          int rc = stmt.executeUpdate(query);
+          LOG.info("Removed " + rc + "  empty Aborted transactions from TXNS");
+        }
+        LOG.info("Aborted transactions removed from TXNS: " + txnids);
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to delete from txns table " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      cleanEmptyAbortedTxns();
+    }
+  }
+
+  /**
+   * This will take all entries assigned to workers
+   * on a host return them to INITIATED state.  The initiator should use this at start up to
+   * clean entries from any workers that were in the middle of compacting when the metastore
+   * shutdown.  It does not reset entries from worker threads on other hosts as those may still
+   * be working.
+   * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
+   *                 so that like hostname% will match the worker id.
+   */
+  @Override
+  @RetrySemantics.Idempotent
+  public void revokeFromLocalWorkers(String hostname) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
+          +  hostname + "%'";
+        LOG.debug("Going to execute update <" + s + ">");
+        // It isn't an error if the following returns no rows, as the local workers could have died
+        // with  nothing assigned to them.
+        stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to change dead worker's records back to initiated state " +
+          e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      revokeFromLocalWorkers(hostname);
+    }
+  }
+
+  /**
+   * This call will return all compaction queue
+   * entries assigned to a worker but over the timeout back to the initiated state.
+   * This should be called by the initiator on start up and occasionally when running to clean up
+   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
+   * first.
+   * @param timeout number of milliseconds since start time that should elapse before a worker is
+   *                declared dead.
+   */
+  @Override
+  @RetrySemantics.Idempotent
+  public void revokeTimedoutWorkers(long timeout) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        long latestValidStart = getDbTime(dbConn) - timeout;
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+          + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
+          +  latestValidStart;
+        LOG.debug("Going to execute update <" + s + ">");
+        // It isn't an error if the following returns no rows, as the local workers could have died
+        // with  nothing assigned to them.
+        stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.error("Unable to change dead worker's records back to initiated state " +
+          e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(stmt);
+        closeDbConn(dbConn);
+      }
+    } catch (RetryException e) {
+      revokeTimedoutWorkers(timeout);
+    }
+  }
+
+  /**
+   * Queries metastore DB directly to find columns in the table which have statistics information.
+   * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+   * table level stats are examined.
+   * @throws MetaException
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        String quote = getIdentifierQuoteString(dbConn);
+        stmt = dbConn.createStatement();
+        StringBuilder bldr = new StringBuilder();
+        bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+          .append(" FROM ")
+          .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
+          .append(quote)
+          .append(" WHERE ")
+          .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname)
+          .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
+          .append(" = '").append(ci.tableName).append("'");
+        if (ci.partName != null) {
+          bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
+            .append(ci.partName).append("'");
+        }
+        String s = bldr.toString();
+
+      /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
+          "PART_COL_STATS")
+         + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+        + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
+        LOG.debug("Going to execute <" + s + ">");
+        rs = stmt.executeQuery(s);
+        List<String> columns = new ArrayList<>();
+        while (rs.next()) {
+          columns.add(rs.getString(1));
+        }
+        LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+          (ci.partName == null ? "" : "/" + ci.partName));
+        dbConn.commit();
+        return columns;
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName +
+          (ci.partName == null ? "" : "/" + ci.partName) + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException ex) {
+      return findColumnsWithStats(ci);
+    }
+  }
+
+  /**
+   * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+   * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
+   */
+  @Override
+  @RetrySemantics.Idempotent
+  public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId +
+          " WHERE CQ_ID = " + ci.id);
+        if(updCount != 1) {
+          throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci);
+        }
+        dbConn.commit();
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(null, stmt, dbConn);
+      }
+    } catch (RetryException ex) {
+      setCompactionHighestTxnId(ci, highestTxnId);
+    }
+  }
+  private static class RetentionCounters {
+    int attemptedRetention = 0;
+    int failedRetention = 0;
+    int succeededRetention = 0;
+    RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
+      this.attemptedRetention = attemptedRetention;
+      this.failedRetention = failedRetention;
+      this.succeededRetention = succeededRetention;
+    }
+  }
+  private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
+    switch (ci.state) {
+      case ATTEMPTED_STATE:
+        if(--rc.attemptedRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      case FAILED_STATE:
+        if(--rc.failedRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      case SUCCEEDED_STATE:
+        if(--rc.succeededRetention < 0) {
+          deleteSet.add(ci.id);
+        }
+        break;
+      default:
+        //do nothing to hanlde future RU/D where we may want to add new state types
+    }
+  }
+
+  /**
+   * For any given compactable entity (partition; table if not partitioned) the history of compactions
+   * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
+   * history such that a configurable number of each type of state is present.  Any other entries
+   * can be purged.  This scheme has advantage of always retaining the last failure/success even if
+   * it's not recent.
+   * @throws MetaException
+   */
+  @Override
+  @RetrySemantics.SafeToRetry
+  public void purgeCompactionHistory() throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    List<Long> deleteSet = new ArrayList<>();
+    RetentionCounters rc = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
+        thus this query groups by entity and withing group sorts most recent first*/
+        rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
+          "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
+        String lastCompactedEntity = null;
+        /*In each group, walk from most recent and count occurences of each state type.  Once you
+        * have counted enough (for each state) to satisfy retention policy, delete all other
+        * instances of this status.*/
+        while(rs.next()) {
+          CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+          if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+            lastCompactedEntity = ci.getFullPartitionName();
+            rc = new RetentionCounters(MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+              getFailedCompactionRetention(),
+              MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+          }
+          checkForDeletion(deleteSet, ci, rc);
+        }
+        close(rs);
+
+        if (deleteSet.size() <= 0) {
+          return;
+        }
+
+        List<String> queries = new ArrayList<>();
+
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+
+        prefix.append("delete from COMPLETED_COMPACTIONS where ");
+        suffix.append("");
+
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
+
+        for (String query : queries) {
+          LOG.debug("Going to execute update <" + query + ">");
+          int count = stmt.executeUpdate(query);
+          LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS");
+        }
+        dbConn.commit();
+      } catch (SQLException e) {
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "purgeCompactionHistory()");
+        throw new MetaException("Unable to connect to transaction database " +
+          StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException ex) {
+      purgeCompactionHistory();
+    }
+  }
+  /**
+   * this ensures that the number of failed compaction entries retained is > than number of failed
+   * compaction threshold which prevents new compactions from being scheduled.
+   */
+  private int getFailedCompactionRetention() {
+    int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+    int failedRetention = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+    if(failedRetention < failedThreshold) {
+      LOG.warn("Invalid configuration " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+        "=" + failedRetention + " < " + ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+        failedRetention + ".  Will use " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.varname +
+        "=" + failedRetention);
+      failedRetention = failedThreshold;
+    }
+    return failedRetention;
+  }
+  /**
+   * Returns {@code true} if there already exists sufficient number of consecutive failures for
+   * this table/partition so that no new automatic compactions will be scheduled.
+   * User initiated compactions don't do this check.
+   *
+   * Do we allow compacting whole table (when it's partitioned)?  No, though perhaps we should.
+   * That would be a meta operations, i.e. first find all partitions for this table (which have 
+   * txn info) and schedule each compaction separately.  This avoids complications in this logic.
+   */
+  @Override
+  @RetrySemantics.ReadOnly
+  public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
+          "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
+          "CC_TABLE = " + quoteString(ci.tableName) +
+          (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
+          " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
+        int numFailed = 0;
+        int numTotal = 0;
+        int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+        while(rs.next() && ++numTotal <= failedThreshold) {
+          if(rs.getString(1).charAt(0) == FAILED_STATE) {
+            numFailed++;
+          }
+          else {
+            numFailed--;
+          }
+        }
+        return numFailed == failedThreshold;
+      }
+      catch (SQLException e) {
+        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+        LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+        return false;//weren't able to check
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return checkFailedCompactions(ci);
+    }
+  }
+  /**
+   * If there is an entry in compaction_queue with ci.id, remove it
+   * Make entry in completed_compactions with status 'f'.
+   * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+   * which we record as ATTEMPTED_STATE entry in history.
+   */
+  @Override
+  @RetrySemantics.CannotRetry
+  public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
+    //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      PreparedStatement pStmt = null;
+      ResultSet rs = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+        if(rs.next()) {
+          ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+          String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
+          LOG.debug("Going to execute update <" + s + ">");
+          int updCnt = stmt.executeUpdate(s);
+        }
+        else {
+          if(ci.id > 0) {
+            //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+            throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+          }
+        }
+        if(ci.id == 0) {
+          //The failure occurred before we even made an entry in COMPACTION_QUEUE
+          //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+          ci.id = generateCompactionQueueId(stmt);
+          //mostly this indicates that the Initiator is paying attention to some table even though
+          //compactions are not happening.
+          ci.state = ATTEMPTED_STATE;
+          //this is not strictly accurate, but 'type' cannot be null.
+          if(ci.type == null) { ci.type = CompactionType.MINOR; }
+          ci.start = getDbTime(dbConn);
+        }
+        else {
+          ci.state = FAILED_STATE;
+        }
+        close(rs, stmt, null);
+
+        pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        LOG.debug("Going to commit");
+        closeStmt(pStmt);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        try {
+          checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+        }
+        catch(MetaException ex) {
+          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+        }
+        LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
+      } finally {
+        close(rs, stmt, null);
+        close(null, pStmt, dbConn);
+      }
+    } catch (RetryException e) {
+      markFailed(ci);
+    }
+  }
+  @Override
+  @RetrySemantics.Idempotent
+  public void setHadoopJobId(String hadoopJobId, long id) {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+        LOG.debug("Going to execute <" + s + ">");
+        int updateCount = stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        closeStmt(stmt);
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        try {
+          checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+        }
+        catch(MetaException ex) {
+          LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+        }
+        LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+      } finally {
+        close(null, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      setHadoopJobId(hadoopJobId, id);
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f4a12a56/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
new file mode 100644
index 0000000..d09c958
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLTransactionRollbackException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for creating and destroying txn database/schema, plus methods for
+ * querying against metastore tables.
+ * Placed here in a separate class so it can be shared across unit tests.
+ */
+public final class TxnDbUtil {
+
+  static final private Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName());
+  private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+  private static int deadlockCnt = 0;
+
+  private TxnDbUtil() {
+    throw new UnsupportedOperationException("Can't initialize class");
+  }
+
+  /**
+   * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
+   * and the JDBC configs will be set for putting the transaction and lock info in the embedded
+   * metastore.
+   *
+   * @param conf HiveConf to add these values to
+   */
+  public static void setConfValues(Configuration conf) {
+    MetastoreConf.setVar(conf, ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
+    MetastoreConf.setBoolVar(conf, ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+  }
+
+  public static void prepDb(Configuration conf) throws Exception {
+    // This is a bogus hack because it copies the contents of the SQL file
+    // intended for creating derby databases, and thus will inexorably get
+    // out of date with it.  I'm open to any suggestions on how to make this
+    // read the file in a build friendly way.
+
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = getConnection(conf);
+      stmt = conn.createStatement();
+      stmt.execute("CREATE TABLE TXNS (" +
+          "  TXN_ID bigint PRIMARY KEY," +
+          "  TXN_STATE char(1) NOT NULL," +
+          "  TXN_STARTED bigint NOT NULL," +
+          "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
+          "  TXN_USER varchar(128) NOT NULL," +
+          "  TXN_HOST varchar(128) NOT NULL)");
+
+      stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
+          "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+          "  TC_DATABASE varchar(128) NOT NULL," +
+          "  TC_TABLE varchar(128)," +
+          "  TC_PARTITION varchar(767)," +
+          "  TC_OPERATION_TYPE char(1) NOT NULL)");
+      stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+          "  CTC_TXNID bigint," +
+          "  CTC_DATABASE varchar(128) NOT NULL," +
+          "  CTC_TABLE varchar(128)," +
+          "  CTC_PARTITION varchar(767))");
+      stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+      stmt.execute("CREATE TABLE HIVE_LOCKS (" +
+          " HL_LOCK_EXT_ID bigint NOT NULL," +
+          " HL_LOCK_INT_ID bigint NOT NULL," +
+          " HL_TXNID bigint," +
+          " HL_DB varchar(128) NOT NULL," +
+          " HL_TABLE varchar(128)," +
+          " HL_PARTITION varchar(767)," +
+          " HL_LOCK_STATE char(1) NOT NULL," +
+          " HL_LOCK_TYPE char(1) NOT NULL," +
+          " HL_LAST_HEARTBEAT bigint NOT NULL," +
+          " HL_ACQUIRED_AT bigint," +
+          " HL_USER varchar(128) NOT NULL," +
+          " HL_HOST varchar(128) NOT NULL," +
+          " HL_HEARTBEAT_COUNT integer," +
+          " HL_AGENT_INFO varchar(128)," +
+          " HL_BLOCKEDBY_EXT_ID bigint," +
+          " HL_BLOCKEDBY_INT_ID bigint," +
+        " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+      stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+
+      stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+
+      stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
+          " CQ_ID bigint PRIMARY KEY," +
+          " CQ_DATABASE varchar(128) NOT NULL," +
+          " CQ_TABLE varchar(128) NOT NULL," +
+          " CQ_PARTITION varchar(767)," +
+          " CQ_STATE char(1) NOT NULL," +
+          " CQ_TYPE char(1) NOT NULL," +
+          " CQ_TBLPROPERTIES varchar(2048)," +
+          " CQ_WORKER_ID varchar(128)," +
+          " CQ_START bigint," +
+          " CQ_RUN_AS varchar(128)," +
+          " CQ_HIGHEST_TXN_ID bigint," +
+          " CQ_META_INFO varchar(2048) for bit data," +
+          " CQ_HADOOP_JOB_ID varchar(32))");
+
+      stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+      
+      stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
+        " CC_ID bigint PRIMARY KEY," +
+        " CC_DATABASE varchar(128) NOT NULL," +
+        " CC_TABLE varchar(128) NOT NULL," +
+        " CC_PARTITION varchar(767)," +
+        " CC_STATE char(1) NOT NULL," +
+        " CC_TYPE char(1) NOT NULL," +
+        " CC_TBLPROPERTIES varchar(2048)," +
+        " CC_WORKER_ID varchar(128)," +
+        " CC_START bigint," +
+        " CC_END bigint," +
+        " CC_RUN_AS varchar(128)," +
+        " CC_HIGHEST_TXN_ID bigint," +
+        " CC_META_INFO varchar(2048) for bit data," +
+        " CC_HADOOP_JOB_ID varchar(32))");
+      
+      stmt.execute("CREATE TABLE AUX_TABLE (" +
+        " MT_KEY1 varchar(128) NOT NULL," +
+        " MT_KEY2 bigint NOT NULL," +
+        " MT_COMMENT varchar(255)," +
+        " PRIMARY KEY(MT_KEY1, MT_KEY2))");
+      
+      stmt.execute("CREATE TABLE WRITE_SET (" +
+        " WS_DATABASE varchar(128) NOT NULL," +
+        " WS_TABLE varchar(128) NOT NULL," +
+        " WS_PARTITION varchar(767)," +
+        " WS_TXNID bigint NOT NULL," +
+        " WS_COMMIT_ID bigint NOT NULL," +
+        " WS_OPERATION_TYPE char(1) NOT NULL)"
+      );
+    } catch (SQLException e) {
+      try {
+        conn.rollback();
+      } catch (SQLException re) {
+        LOG.error("Error rolling back: " + re.getMessage());
+      }
+
+      // This might be a deadlock, if so, let's retry
+      if (e instanceof SQLTransactionRollbackException && deadlockCnt++ < 5) {
+        LOG.warn("Caught deadlock, retrying db creation");
+        prepDb(conf);
+      } else {
+        throw e;
+      }
+    } finally {
+      deadlockCnt = 0;
+      closeResources(conn, stmt, null);
+    }
+  }
+
+  public static void cleanDb(Configuration conf) throws Exception {
+    int retryCount = 0;
+    while(++retryCount <= 3) {
+      boolean success = true;
+      Connection conn = null;
+      Statement stmt = null;
+      try {
+        conn = getConnection(conf);
+        stmt = conn.createStatement();
+
+        // We want to try these, whether they succeed or fail.
+        try {
+          stmt.execute("DROP INDEX HL_TXNID_INDEX");
+        } catch (SQLException e) {
+          if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) {
+            //42X65/3000 means index doesn't exist
+            LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() +
+              "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
+            success = false;
+          }
+        }
+
+        success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
+        success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
+        success &= dropTable(stmt, "TXNS", retryCount);
+        success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+        success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
+        success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
+        success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
+        success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
+        success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
+        success &= dropTable(stmt, "AUX_TABLE", retryCount);
+        success &= dropTable(stmt, "WRITE_SET", retryCount);
+      } finally {
+        closeResources(conn, stmt, null);
+      }
+      if(success) {
+        return;
+      }
+    }
+    throw new RuntimeException("Failed to clean up txn tables");
+  }
+
+  private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException {
+    for (int i = 0; i < 3; i++) {
+      try {
+        stmt.execute("DROP TABLE " + name);
+        LOG.debug("Successfully dropped table " + name);
+        return true;
+      } catch (SQLException e) {
+        if ("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+          LOG.debug("Not dropping " + name + " because it doesn't exist");
+          //failed because object doesn't exist
+          return true;
+        }
+        if ("X0Y25".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+          // Intermittent failure
+          LOG.warn("Intermittent drop failure, retrying, try number " + i);
+          continue;
+        }
+        LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
+            " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
+      }
+    }
+    LOG.error("Failed to drop table, don't know why");
+    return false;
+  }
+
+  /**
+   * A tool to count the number of partitions, tables,
+   * and databases locked by a particular lockId.
+   *
+   * @param lockId lock id to look for lock components
+   *
+   * @return number of components, or 0 if there is no lock
+   */
+  public static int countLockComponents(Configuration conf, long lockId) throws Exception {
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = getConnection(conf);
+      stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?");
+      stmt.setLong(1, lockId);
+      rs = stmt.executeQuery();
+      if (!rs.next()) {
+        return 0;
+      }
+      return rs.getInt(1);
+    } finally {
+      closeResources(conn, stmt, rs);
+    }
+  }
+
+  /**
+   * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables
+   * @param countQuery countQuery text
+   * @return count countQuery result
+   * @throws Exception
+   */
+  public static int countQueryAgent(Configuration conf, String countQuery) throws Exception {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      conn = getConnection(conf);
+      stmt = conn.createStatement();
+      rs = stmt.executeQuery(countQuery);
+      if (!rs.next()) {
+        return 0;
+      }
+      return rs.getInt(1);
+    } finally {
+      closeResources(conn, stmt, rs);
+    }
+  }
+  @VisibleForTesting
+  public static String queryToString(Configuration conf, String query) throws Exception {
+    return queryToString(conf, query, true);
+  }
+  public static String queryToString(Configuration conf, String query, boolean includeHeader)
+      throws Exception {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    StringBuilder sb = new StringBuilder();
+    try {
+      conn = getConnection(conf);
+      stmt = conn.createStatement();
+      rs = stmt.executeQuery(query);
+      ResultSetMetaData rsmd = rs.getMetaData();
+      if(includeHeader) {
+        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+          sb.append(rsmd.getColumnName(colPos)).append("   ");
+        }
+        sb.append('\n');
+      }
+      while(rs.next()) {
+        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+          sb.append(rs.getObject(colPos)).append("   ");
+        }
+        sb.append('\n');
+      }
+    } finally {
+      closeResources(conn, stmt, rs);
+    }
+    return sb.toString();
+  }
+
+  static Connection getConnection(Configuration conf) throws Exception {
+    String jdbcDriver = MetastoreConf.getVar(conf, ConfVars.CONNECTION_DRIVER);
+    Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
+    Properties prop = new Properties();
+    String driverUrl = MetastoreConf.getVar(conf, ConfVars.CONNECTURLKEY);
+    String user = MetastoreConf.getVar(conf, ConfVars.CONNECTION_USER_NAME);
+    String passwd = MetastoreConf.getPassword(conf, MetastoreConf.ConfVars.PWD);
+    prop.setProperty("user", user);
+    prop.setProperty("password", passwd);
+    Connection conn = driver.connect(driverUrl, prop);
+    conn.setAutoCommit(true);
+    return conn;
+  }
+
+  static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
+    if (rs != null) {
+      try {
+        rs.close();
+      } catch (SQLException e) {
+        LOG.error("Error closing ResultSet: " + e.getMessage());
+      }
+    }
+
+    if (stmt != null) {
+      try {
+        stmt.close();
+      } catch (SQLException e) {
+        System.err.println("Error closing Statement: " + e.getMessage());
+      }
+    }
+
+    if (conn != null) {
+      try {
+        conn.rollback();
+      } catch (SQLException e) {
+        System.err.println("Error rolling back: " + e.getMessage());
+      }
+      try {
+        conn.close();
+      } catch (SQLException e) {
+        System.err.println("Error closing Connection: " + e.getMessage());
+      }
+    }
+  }
+}