You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:27 UTC

[18/50] [abbrv] hive git commit: HIVE-19416 : merge master into branch (Sergey Shelukhin) 0719

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
index 0000000,4a97f89..267c9e8
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
@@@ -1,0 -1,155 +1,162 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore.model;
+ 
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class MPartition {
+ 
+   private String partitionName; // partitionname ==>  (key=value/)*(key=value)
+   private MTable table; 
+   private List<String> values;
+   private int createTime;
+   private int lastAccessTime;
+   private MStorageDescriptor sd;
+   private Map<String, String> parameters;
 -  
++  private long writeId;
+   
+   public MPartition() {}
+   
+   /**
+    * @param partitionName
+    * @param table
+    * @param values
+    * @param createTime
+    * @param lastAccessTime
+    * @param sd
+    * @param parameters
+    */
+   public MPartition(String partitionName, MTable table, List<String> values, int createTime,
+       int lastAccessTime, MStorageDescriptor sd, Map<String, String> parameters) {
+     this.partitionName = partitionName;
+     this.table = table;
+     this.values = values;
+     this.createTime = createTime;
+     this.lastAccessTime = lastAccessTime;
+     this.sd = sd;
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the lastAccessTime
+    */
+   public int getLastAccessTime() {
+     return lastAccessTime;
+   }
+ 
+   /**
+    * @param lastAccessTime the lastAccessTime to set
+    */
+   public void setLastAccessTime(int lastAccessTime) {
+     this.lastAccessTime = lastAccessTime;
+   }
+ 
+   /**
+    * @return the values
+    */
+   public List<String> getValues() {
+     return values;
+   }
+ 
+   /**
+    * @param values the values to set
+    */
+   public void setValues(List<String> values) {
+     this.values = values;
+   }
+ 
+   /**
+    * @return the table
+    */
+   public MTable getTable() {
+     return table;
+   }
+ 
+   /**
+    * @param table the table to set
+    */
+   public void setTable(MTable table) {
+     this.table = table;
+   }
+ 
+   /**
+    * @return the sd
+    */
+   public MStorageDescriptor getSd() {
+     return sd;
+   }
+ 
+   /**
+    * @param sd the sd to set
+    */
+   public void setSd(MStorageDescriptor sd) {
+     this.sd = sd;
+   }
+ 
+   /**
+    * @return the parameters
+    */
+   public Map<String, String> getParameters() {
+     return parameters;
+   }
+ 
+   /**
+    * @param parameters the parameters to set
+    */
+   public void setParameters(Map<String, String> parameters) {
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the partitionName
+    */
+   public String getPartitionName() {
+     return partitionName;
+   }
+ 
+   /**
+    * @param partitionName the partitionName to set
+    */
+   public void setPartitionName(String partitionName) {
+     this.partitionName = partitionName;
+   }
+ 
+   /**
+    * @return the createTime
+    */
+   public int getCreateTime() {
+     return createTime;
+   }
+ 
+   /**
+    * @param createTime the createTime to set
+    */
+   public void setCreateTime(int createTime) {
+     this.createTime = createTime;
+   }
+ 
++  public long getWriteId() {
++    return writeId;
++  }
++
++  public void setWriteId(long writeId) {
++    this.writeId = writeId;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
index 0000000,38ad479..deeb971
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
@@@ -1,0 -1,273 +1,283 @@@
++
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.metastore.model;
+ 
+ import java.util.List;
+ import java.util.Map;
+ 
+ public class MTable {
+   
+   private String tableName;
+   private MDatabase database;
+   private MStorageDescriptor sd;
+   private String owner;
+   private String ownerType;
+   private int createTime;
+   private int lastAccessTime;
+   private int retention;
+   private List<MFieldSchema> partitionKeys;
+   private Map<String, String> parameters;
+   private String viewOriginalText;
+   private String viewExpandedText;
+   private boolean rewriteEnabled;
+   private String tableType;
++  private long writeId;
+ 
+   public MTable() {}
+ 
+   /**
+    * @param tableName
+    * @param database
+    * @param sd
+    * @param owner
+    * @param ownerType
+    * @param createTime
+    * @param lastAccessTime
+    * @param retention
+    * @param partitionKeys
+    * @param parameters
+    * @param viewOriginalText
+    * @param viewExpandedText
+    * @param tableType
+    */
+   public MTable(String tableName, MDatabase database, MStorageDescriptor sd, String owner, String ownerType,
+       int createTime, int lastAccessTime, int retention, List<MFieldSchema> partitionKeys,
+       Map<String, String> parameters, String viewOriginalText, String viewExpandedText,
+       boolean rewriteEnabled, String tableType) {
+     this.tableName = tableName;
+     this.database = database;
+     this.sd = sd;
+     this.owner = owner;
+     this.ownerType = ownerType;
+     this.createTime = createTime;
+     this.setLastAccessTime(lastAccessTime);
+     this.retention = retention;
+     this.partitionKeys = partitionKeys;
+     this.parameters = parameters;
+     this.viewOriginalText = viewOriginalText;
+     this.viewExpandedText = viewExpandedText;
+     this.rewriteEnabled = rewriteEnabled;
+     this.tableType = tableType;
+   }
+ 
+   /**
+    * @return the tableName
+    */
+   public String getTableName() {
+     return tableName;
+   }
+ 
+   /**
+    * @param tableName the tableName to set
+    */
+   public void setTableName(String tableName) {
+     this.tableName = tableName;
+   }
+ 
+   /**
+    * @return the sd
+    */
+   public MStorageDescriptor getSd() {
+     return sd;
+   }
+ 
+   /**
+    * @param sd the sd to set
+    */
+   public void setSd(MStorageDescriptor sd) {
+     this.sd = sd;
+   }
+ 
+   /**
+    * @return the partKeys
+    */
+   public List<MFieldSchema> getPartitionKeys() {
+     return partitionKeys;
+   }
+ 
+   /**
+    * @param partKeys the partKeys to set
+    */
+   public void setPartitionKeys(List<MFieldSchema> partKeys) {
+     this.partitionKeys = partKeys;
+   }
+ 
+   /**
+    * @return the parameters
+    */
+   public Map<String, String> getParameters() {
+     return parameters;
+   }
+ 
+   /**
+    * @param parameters the parameters to set
+    */
+   public void setParameters(Map<String, String> parameters) {
+     this.parameters = parameters;
+   }
+ 
+   /**
+    * @return the original view text, or null if this table is not a view
+    */
+   public String getViewOriginalText() {
+     return viewOriginalText;
+   }
+ 
+   /**
+    * @param viewOriginalText the original view text to set
+    */
+   public void setViewOriginalText(String viewOriginalText) {
+     this.viewOriginalText = viewOriginalText;
+   }
+ 
+   /**
+    * @return the expanded view text, or null if this table is not a view
+    */
+   public String getViewExpandedText() {
+     return viewExpandedText;
+   }
+ 
+   /**
+    * @param viewExpandedText the expanded view text to set
+    */
+   public void setViewExpandedText(String viewExpandedText) {
+     this.viewExpandedText = viewExpandedText;
+   }
+ 
+   /**
+    * @return whether the view can be used for rewriting queries
+    */
+   public boolean isRewriteEnabled() {
+     return rewriteEnabled;
+   }
+ 
+   /**
+    * @param rewriteEnabled whether the view can be used for rewriting queries
+    */
+   public void setRewriteEnabled(boolean rewriteEnabled) {
+     this.rewriteEnabled = rewriteEnabled;
+   }
+ 
+   /**
+    * @return the owner
+    */
+   public String getOwner() {
+     return owner;
+   }
+ 
+   /**
+    * @param owner the owner to set
+    */
+   public void setOwner(String owner) {
+     this.owner = owner;
+   }
+ 
+   /**
+    * @return the owner type
+    */
+   public String getOwnerType() {
+     return ownerType;
+   }
+ 
+   /**
+    * @param ownerType the owner type to set
+    */
+   public void setOwnerType(String ownerType) {
+     this.ownerType = ownerType;
+   }
+ 
+   /**
+    * @return the createTime
+    */
+   public int getCreateTime() {
+     return createTime;
+   }
+ 
+   /**
+    * @param createTime the createTime to set
+    */
+   public void setCreateTime(int createTime) {
+     this.createTime = createTime;
+   }
+ 
+   /**
+    * @return the database
+    */
+   public MDatabase getDatabase() {
+     return database;
+   }
+ 
+   /**
+    * @param database the database to set
+    */
+   public void setDatabase(MDatabase database) {
+     this.database = database;
+   }
+ 
+   /**
+    * @return the retention
+    */
+   public int getRetention() {
+     return retention;
+   }
+ 
+   /**
+    * @param retention the retention to set
+    */
+   public void setRetention(int retention) {
+     this.retention = retention;
+   }
+ 
+   /**
+    * @param lastAccessTime the lastAccessTime to set
+    */
+   public void setLastAccessTime(int lastAccessTime) {
+     this.lastAccessTime = lastAccessTime;
+   }
+ 
+   /**
+    * @return the lastAccessTime
+    */
+   public int getLastAccessTime() {
+     return lastAccessTime;
+   }
+ 
+   /**
+    * @param tableType the tableType to set
+    */
+   public void setTableType(String tableType) {
+     this.tableType = tableType;
+   }
+ 
+   /**
+    * @return the tableType
+    */
+   public String getTableType() {
+     return tableType;
+   }
++
++  public long getWriteId() {
++    return writeId;
++  }
++
++  public void setWriteId(long writeId) {
++    this.writeId = writeId;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 0000000,4e3068d..1f559e9
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@@ -1,0 -1,1107 +1,1158 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hive.metastore.txn;
+ 
++import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.api.CompactionType;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.util.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.sql.Connection;
+ import java.sql.PreparedStatement;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ 
+ /**
+  * Extends the transaction handler with methods needed only by the compactor threads.  These
+  * methods are not available through the thrift interface.
+  */
+ class CompactionTxnHandler extends TxnHandler {
+   static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
+   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ 
+   public CompactionTxnHandler() {
+   }
+ 
+   /**
+    * This will look through the completed_txn_components table and look for partitions or tables
+    * that may be ready for compaction.  Also, look through txns and txn_components tables for
+    * aborted transactions that we should add to the list.
+    * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+    *                   potential compaction.
+    * @return list of CompactionInfo structs.  These will not have id, type,
+    * or runAs set since these are only potential compactions not actual ones.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+     Connection dbConn = null;
+     Set<CompactionInfo> response = new HashSet<>();
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         // Check for completed transactions
+         String s = "select distinct ctc_database, ctc_table, " +
+           "ctc_partition from COMPLETED_TXN_COMPONENTS";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.dbname = rs.getString(1);
+           info.tableName = rs.getString(2);
+           info.partName = rs.getString(3);
+           response.add(info);
+         }
+         rs.close();
+ 
+         // Check for aborted txns
+         s = "select tc_database, tc_table, tc_partition " +
+           "from TXNS, TXN_COMPONENTS " +
+           "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
+           "group by tc_database, tc_table, tc_partition " +
+           "having count(*) > " + maxAborted;
+ 
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.dbname = rs.getString(1);
+           info.tableName = rs.getString(2);
+           info.partName = rs.getString(3);
+           info.tooManyAborts = true;
+           response.add(info);
+         }
+ 
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+       } catch (SQLException e) {
+         LOG.error("Unable to connect to transaction database " + e.getMessage());
+         checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+       return response;
+     }
+     catch (RetryException e) {
+       return findPotentialCompactions(maxAborted);
+     }
+   }
+ 
+   /**
+    * Sets the user to run as.  This is for the case
+    * where the request was generated by the user and so the worker must set this value later.
+    * @param cq_id id of this entry in the queue
+    * @param user user to run the jobs as
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void setRunAs(long cq_id, String user) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCnt = stmt.executeUpdate(s);
+         if (updCnt != 1) {
+           LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ".  updCnt=" + updCnt);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to update compaction queue, " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")");
+       } finally {
+         closeDbConn(dbConn);
+         closeStmt(stmt);
+       }
+     } catch (RetryException e) {
+       setRunAs(cq_id, user);
+     }
+   }
+ 
+   /**
+    * This will grab the next compaction request off of
+    * the queue, and assign it to the worker.
+    * @param workerId id of the worker calling this, will be recorded in the db
+    * @return an info element for this compaction request, or null if there is no work to do now.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public CompactionInfo findNextToCompact(String workerId) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+       Statement updStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+           "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           LOG.debug("No compactions found ready to compact");
+           dbConn.rollback();
+           return null;
+         }
+         updStmt = dbConn.createStatement();
+         do {
+           CompactionInfo info = new CompactionInfo();
+           info.id = rs.getLong(1);
+           info.dbname = rs.getString(2);
+           info.tableName = rs.getString(3);
+           info.partName = rs.getString(4);
+           info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+           info.properties = rs.getString(6);
+           // Now, update this record as being worked on by this worker.
+           long now = getDbTime(dbConn);
+           s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+             "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+             " AND cq_state='" + INITIATED_STATE + "'";
+           LOG.debug("Going to execute update <" + s + ">");
+           int updCount = updStmt.executeUpdate(s);
+           if(updCount == 1) {
+             dbConn.commit();
+             return info;
+           }
+           if(updCount == 0) {
+             LOG.debug("Another Worker picked up " + info);
+             continue;
+           }
+           LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
+             info + ". updCnt=" + updCount + ".");
+           dbConn.rollback();
+           return null;
+         } while( rs.next());
+         dbConn.rollback();
+         return null;
+       } catch (SQLException e) {
+         LOG.error("Unable to select next element for compaction, " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(updStmt);
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return findNextToCompact(workerId);
+     }
+   }
+ 
+   /**
+    * This will mark an entry in the queue as compacted
+    * and put it in the ready to clean state.
+    * @param info info on the compaction entry to mark as compacted.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void markCompacted(CompactionInfo info) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
+           "cq_worker_id = null where cq_id = " + info.id;
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCnt = stmt.executeUpdate(s);
+         if (updCnt != 1) {
+           LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to update compaction queue " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "markCompacted(" + info + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       markCompacted(info);
+     }
+   }
+ 
+   /**
+    * Find entries in the queue that are ready to
+    * be cleaned.
+    * @return information on the entry in the queue.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public List<CompactionInfo> findReadyToClean() throws MetaException {
+     Connection dbConn = null;
+     List<CompactionInfo> rc = new ArrayList<>();
+ 
+     Statement stmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select cq_id, cq_database, cq_table, cq_partition, "
+                 + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '"
+                 + READY_FOR_CLEANING + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         while (rs.next()) {
+           CompactionInfo info = new CompactionInfo();
+           info.id = rs.getLong(1);
+           info.dbname = rs.getString(2);
+           info.tableName = rs.getString(3);
+           info.partName = rs.getString(4);
+           switch (rs.getString(5).charAt(0)) {
+             case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+             case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+             default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+           }
+           info.runAs = rs.getString(6);
+           info.highestWriteId = rs.getLong(7);
+           rc.add(info);
+         }
+         LOG.debug("Going to rollback");
+         dbConn.rollback();
+         return rc;
+       } catch (SQLException e) {
+         LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findReadyToClean");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return findReadyToClean();
+     }
+   }
+ 
+   /**
+    * This will remove an entry from the queue after
+    * it has been compacted.
+    * 
+    * @param info info on the compaction entry to remove
+    */
+   @Override
+   @RetrySemantics.CannotRetry
+   public void markCleaned(CompactionInfo info) throws MetaException {
+     try {
+       Connection dbConn = null;
+       PreparedStatement pStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+         pStmt.setLong(1, info.id);
+         rs = pStmt.executeQuery();
+         if(rs.next()) {
+           info = CompactionInfo.loadFullFromCompactionQueue(rs);
+         }
+         else {
+           throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+         }
+         close(rs);
+         String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+         pStmt = dbConn.prepareStatement(s);
+         pStmt.setLong(1, info.id);
+         LOG.debug("Going to execute update <" + s + ">");
+         int updCount = pStmt.executeUpdate();
+         if (updCount != 1) {
+           LOG.error("Unable to delete compaction record: " + info +  ".  Update count=" + updCount);
+           LOG.debug("Going to rollback");
+           dbConn.rollback();
+         }
+         pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+         info.state = SUCCEEDED_STATE;
+         CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
+         updCount = pStmt.executeUpdate();
+ 
+         // Remove entries from completed_txn_components as well, so we don't start looking there
+         // again but only up to the highest write ID include in this compaction job.
+         //highestWriteId will be NULL in upgrade scenarios
+         s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
+             "ctc_table = ?";
+         if (info.partName != null) {
+           s += " and ctc_partition = ?";
+         }
+         if(info.highestWriteId != 0) {
+           s += " and ctc_writeid <= ?";
+         }
+         pStmt = dbConn.prepareStatement(s);
+         int paramCount = 1;
+         pStmt.setString(paramCount++, info.dbname);
+         pStmt.setString(paramCount++, info.tableName);
+         if (info.partName != null) {
+           pStmt.setString(paramCount++, info.partName);
+         }
+         if(info.highestWriteId != 0) {
+           pStmt.setLong(paramCount++, info.highestWriteId);
+         }
+         LOG.debug("Going to execute update <" + s + ">");
+         if (pStmt.executeUpdate() < 1) {
+           LOG.error("Expected to remove at least one row from completed_txn_components when " +
+             "marking compaction entry as clean!");
+         }
+ 
+         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
+           TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
+         if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
+         if (info.partName != null) s += " and tc_partition = ?";
+ 
+         pStmt = dbConn.prepareStatement(s);
+         paramCount = 1;
+         pStmt.setString(paramCount++, info.dbname);
+         pStmt.setString(paramCount++, info.tableName);
+         if(info.highestWriteId != 0) {
+           pStmt.setLong(paramCount++, info.highestWriteId);
+         }
+         if (info.partName != null) {
+           pStmt.setString(paramCount++, info.partName);
+         }
+ 
+         LOG.debug("Going to execute update <" + s + ">");
+         rs = pStmt.executeQuery();
+         List<Long> txnids = new ArrayList<>();
+         List<String> questions = new ArrayList<>();
+         while (rs.next()) {
+           long id = rs.getLong(1);
+           txnids.add(id);
+           questions.add("?");
+         }
+         // Remove entries from txn_components, as there may be aborted txn components
+         if (txnids.size() > 0) {
+           List<String> queries = new ArrayList<>();
+ 
+           // Prepare prefix and suffix
+           StringBuilder prefix = new StringBuilder();
+           StringBuilder suffix = new StringBuilder();
+ 
+           prefix.append("delete from TXN_COMPONENTS where ");
+ 
+           //because 1 txn may include different partitions/tables even in auto commit mode
+           suffix.append(" and tc_database = ?");
+           suffix.append(" and tc_table = ?");
+           if (info.partName != null) {
+             suffix.append(" and tc_partition = ?");
+           }
+ 
+           // Populate the complete query with provided prefix and suffix
+           List<Integer> counts = TxnUtils
+               .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid",
+                   true, false);
+           int totalCount = 0;
+           for (int i = 0; i < queries.size(); i++) {
+             String query = queries.get(i);
+             int insertCount = counts.get(i);
+ 
+             LOG.debug("Going to execute update <" + query + ">");
+             pStmt = dbConn.prepareStatement(query);
+             for (int j = 0; j < insertCount; j++) {
+               pStmt.setLong(j + 1, txnids.get(totalCount + j));
+             }
+             totalCount += insertCount;
+             paramCount = insertCount + 1;
+             pStmt.setString(paramCount++, info.dbname);
+             pStmt.setString(paramCount++, info.tableName);
+             if (info.partName != null) {
+               pStmt.setString(paramCount++, info.partName);
+             }
+             int rc = pStmt.executeUpdate();
+             LOG.debug("Removed " + rc + " records from txn_components");
+ 
+             // Don't bother cleaning from the txns table.  A separate call will do that.  We don't
+             // know here which txns still have components from other tables or partitions in the
+             // table, so we don't know which ones we can and cannot clean.
+           }
+         }
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from compaction queue " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "markCleaned(" + info + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       markCleaned(info);
+     }
+   }
+ 
+   /**
+    * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
+    * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void cleanTxnToWriteIdTable() throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+ 
+       try {
+         // We query for minimum values in all the queries and they can only increase by any concurrent
+         // operations. So, READ COMMITTED is sufficient.
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+ 
+         // First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
+         // If there are no txns which are currently open or aborted in the system, then current value of
+         // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
+         String s = "select ntxn_next from NEXT_TXN_ID";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (!rs.next()) {
+           throw new MetaException("Transaction tables not properly " +
+                   "initialized, no record found in next_txn_id");
+         }
+         long minUncommittedTxnId = rs.getLong(1);
+ 
+         // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table
+         // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next.
+         s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (rs.next()) {
+           long minOpenTxnId = rs.getLong(1);
+           if (minOpenTxnId > 0) {
+             minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId);
+           }
+         }
+ 
+         // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid
+         // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid).
+         s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED);
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         if (rs.next()) {
+           long minAbortedTxnId = rs.getLong(1);
+           if (minAbortedTxnId > 0) {
+             minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId);
+           }
+         }
+ 
+         // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
+         // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
+         s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId;
+         LOG.debug("Going to execute delete <" + s + ">");
+         int rc = stmt.executeUpdate(s);
+         LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId);
+ 
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from txns table " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "cleanTxnToWriteIdTable");
+         throw new MetaException("Unable to connect to transaction database " +
+                 StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       cleanTxnToWriteIdTable();
+     }
+   }
+ 
+   /**
+    * Clean up aborted transactions from txns that have no components in txn_components. The reason such
+    * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+    * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void cleanEmptyAbortedTxns() throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       ResultSet rs = null;
+       try {
+         //Aborted is a terminal state, so nothing about the txn can change
+         //after that, so READ COMMITTED is sufficient.
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "select txn_id from TXNS where " +
 -          "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
 -          "txn_state = '" + TXN_ABORTED + "'";
++            "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
++            "txn_state = '" + TXN_ABORTED + "'";
+         LOG.debug("Going to execute query <" + s + ">");
+         rs = stmt.executeQuery(s);
+         List<Long> txnids = new ArrayList<>();
+         while (rs.next()) txnids.add(rs.getLong(1));
+         close(rs);
+         if(txnids.size() <= 0) {
+           return;
+         }
+         Collections.sort(txnids);//easier to read logs
++
+         List<String> queries = new ArrayList<>();
+         StringBuilder prefix = new StringBuilder();
+         StringBuilder suffix = new StringBuilder();
+ 
++        // Turn off COLUMN_STATS_ACCURATE for txnids' components in TBLS and PARTITIONS
++        prefix.append("select tbl_id from TBLS inner join DBS on TBLS.DB_ID = DBS.DB_ID "
++            + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME"
++            + " and t2w_writeid = TBLS.WRITE_ID where ");
++        suffix.append("");
++        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false);
++
++        // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from TABLE_PARAMS for the txnids.
++        List<StringBuilder> finalCommands = new ArrayList<>(queries.size());
++        for (int i = 0; i < queries.size(); i++) {
++          String query = queries.get(i);
++          finalCommands.add(i, new StringBuilder("delete from TABLE_PARAMS " +
++                  " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and tbl_id in ("));
++          finalCommands.get(i).append(query + ")");
++          LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++          int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++          LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from TBLS");
++        }
++
++        queries.clear();
++        prefix.setLength(0);
++        suffix.setLength(0);
++        finalCommands.clear();
++
++        // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from PARTITIONS_PARAMS for the txnids.
++        prefix.append("select part_id from PARTITIONS "
++            + "inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
++            + "inner join DBS on TBLS.DB_ID = DBS.DB_ID "
++            + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME"
++            + " and t2w_writeid = TBLS.WRITE_ID where ");
++        suffix.append("");
++        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false);
++
++        for (int i = 0; i < queries.size(); i++) {
++          String query = queries.get(i);
++          finalCommands.add(i, new StringBuilder("delete from PARTITION_PARAMS " +
++                  " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and part_id in ("));
++          finalCommands.get(i).append(query + ")");
++          LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++          int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++          LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from PARTITIONS");
++        }
++
++        queries.clear();
++        prefix.setLength(0);
++        suffix.setLength(0);
++        finalCommands.clear();
++
++        // Delete from TXNS.
+         prefix.append("delete from TXNS where ");
+         suffix.append("");
+ 
+         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+ 
+         for (String query : queries) {
+           LOG.debug("Going to execute update <" + query + ">");
+           int rc = stmt.executeUpdate(query);
+           LOG.info("Removed " + rc + "  empty Aborted transactions from TXNS");
+         }
+         LOG.info("Aborted transactions removed from TXNS: " + txnids);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to delete from txns table " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       cleanEmptyAbortedTxns();
+     }
+   }
+ 
+   /**
+    * This will take all entries assigned to workers
+    * on a host return them to INITIATED state.  The initiator should use this at start up to
+    * clean entries from any workers that were in the middle of compacting when the metastore
+    * shutdown.  It does not reset entries from worker threads on other hosts as those may still
+    * be working.
+    * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
+    *                 so that like hostname% will match the worker id.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void revokeFromLocalWorkers(String hostname) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+           + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
+           +  hostname + "%'";
+         LOG.debug("Going to execute update <" + s + ">");
+         // It isn't an error if the following returns no rows, as the local workers could have died
+         // with  nothing assigned to them.
+         stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to change dead worker's records back to initiated state " +
+           e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       revokeFromLocalWorkers(hostname);
+     }
+   }
+ 
+   /**
+    * This call will return all compaction queue
+    * entries assigned to a worker but over the timeout back to the initiated state.
+    * This should be called by the initiator on start up and occasionally when running to clean up
+    * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
+    * first.
+    * @param timeout number of milliseconds since start time that should elapse before a worker is
+    *                declared dead.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void revokeTimedoutWorkers(long timeout) throws MetaException {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         long latestValidStart = getDbTime(dbConn) - timeout;
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+           + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
+           +  latestValidStart;
+         LOG.debug("Going to execute update <" + s + ">");
+         // It isn't an error if the following returns no rows, as the local workers could have died
+         // with  nothing assigned to them.
+         stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.error("Unable to change dead worker's records back to initiated state " +
+           e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         closeStmt(stmt);
+         closeDbConn(dbConn);
+       }
+     } catch (RetryException e) {
+       revokeTimedoutWorkers(timeout);
+     }
+   }
+ 
+   /**
+    * Queries metastore DB directly to find columns in the table which have statistics information.
+    * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+    * table level stats are examined.
+    * @throws MetaException
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
+     Connection dbConn = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         String quote = getIdentifierQuoteString(dbConn);
+         StringBuilder bldr = new StringBuilder();
+         bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+           .append(" FROM ")
+           .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
+           .append(quote)
+           .append(" WHERE ")
+           .append(quote).append("DB_NAME").append(quote).append(" = ?")
+           .append(" AND ").append(quote).append("TABLE_NAME").append(quote)
+           .append(" = ?");
+         if (ci.partName != null) {
+           bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?");
+         }
+         String s = bldr.toString();
+         pStmt = dbConn.prepareStatement(s);
+         pStmt.setString(1, ci.dbname);
+         pStmt.setString(2, ci.tableName);
+         if (ci.partName != null) {
+           pStmt.setString(3, ci.partName);
+         }
+ 
+       /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
+           "PART_COL_STATS")
+          + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+         + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
+         LOG.debug("Going to execute <" + s + ">");
+         rs = pStmt.executeQuery();
+         List<String> columns = new ArrayList<>();
+         while (rs.next()) {
+           columns.add(rs.getString(1));
+         }
+         LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+           (ci.partName == null ? "" : "/" + ci.partName));
+         dbConn.commit();
+         return columns;
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName +
+           (ci.partName == null ? "" : "/" + ci.partName) + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException ex) {
+       return findColumnsWithStats(ci);
+     }
+   }
+ 
+   /**
+    * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+    * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
+    */
+   @Override
+   @RetrySemantics.Idempotent
+   public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException {
+     Connection dbConn = null;
+     Statement stmt = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId +
+           " WHERE CQ_ID = " + ci.id);
+         if(updCount != 1) {
+           throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci);
+         }
+         dbConn.commit();
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(null, stmt, dbConn);
+       }
+     } catch (RetryException ex) {
+       setCompactionHighestWriteId(ci, highestWriteId);
+     }
+   }
+   private static class RetentionCounters {
+     int attemptedRetention = 0;
+     int failedRetention = 0;
+     int succeededRetention = 0;
+     RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
+       this.attemptedRetention = attemptedRetention;
+       this.failedRetention = failedRetention;
+       this.succeededRetention = succeededRetention;
+     }
+   }
+   private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
+     switch (ci.state) {
+       case ATTEMPTED_STATE:
+         if(--rc.attemptedRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       case FAILED_STATE:
+         if(--rc.failedRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       case SUCCEEDED_STATE:
+         if(--rc.succeededRetention < 0) {
+           deleteSet.add(ci.id);
+         }
+         break;
+       default:
+         //do nothing to hanlde future RU/D where we may want to add new state types
+     }
+   }
+ 
+   /**
+    * For any given compactable entity (partition; table if not partitioned) the history of compactions
+    * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
+    * history such that a configurable number of each type of state is present.  Any other entries
+    * can be purged.  This scheme has advantage of always retaining the last failure/success even if
+    * it's not recent.
+    * @throws MetaException
+    */
+   @Override
+   @RetrySemantics.SafeToRetry
+   public void purgeCompactionHistory() throws MetaException {
+     Connection dbConn = null;
+     Statement stmt = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     List<Long> deleteSet = new ArrayList<>();
+     RetentionCounters rc = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
+         thus this query groups by entity and withing group sorts most recent first*/
+         rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
+           "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
+         String lastCompactedEntity = null;
+         /*In each group, walk from most recent and count occurences of each state type.  Once you
+         * have counted enough (for each state) to satisfy retention policy, delete all other
+         * instances of this status.*/
+         while(rs.next()) {
+           CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+           if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+             lastCompactedEntity = ci.getFullPartitionName();
+             rc = new RetentionCounters(MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+               getFailedCompactionRetention(),
+               MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+           }
+           checkForDeletion(deleteSet, ci, rc);
+         }
+         close(rs);
+ 
+         if (deleteSet.size() <= 0) {
+           return;
+         }
+ 
+         List<String> queries = new ArrayList<>();
+ 
+         StringBuilder prefix = new StringBuilder();
+         StringBuilder suffix = new StringBuilder();
+ 
+         prefix.append("delete from COMPLETED_COMPACTIONS where ");
+         suffix.append("");
+ 
+         List<String> questions = new ArrayList<>(deleteSet.size());
+         for (int  i = 0; i < deleteSet.size(); i++) {
+           questions.add("?");
+         }
+         List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false);
+         int totalCount = 0;
+         for (int i = 0; i < queries.size(); i++) {
+           String query = queries.get(i);
+           long insertCount = counts.get(i);
+           LOG.debug("Going to execute update <" + query + ">");
+           pStmt = dbConn.prepareStatement(query);
+           for (int j = 0; j < insertCount; j++) {
+             pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
+           }
+           totalCount += insertCount;
+           int count = pStmt.executeUpdate();
+           LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS");
+         }
+         dbConn.commit();
+       } catch (SQLException e) {
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "purgeCompactionHistory()");
+         throw new MetaException("Unable to connect to transaction database " +
+           StringUtils.stringifyException(e));
+       } finally {
+         close(rs, stmt, dbConn);
+         closeStmt(pStmt);
+       }
+     } catch (RetryException ex) {
+       purgeCompactionHistory();
+     }
+   }
+   /**
+    * this ensures that the number of failed compaction entries retained is > than number of failed
+    * compaction threshold which prevents new compactions from being scheduled.
+    */
+   private int getFailedCompactionRetention() {
+     int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+     int failedRetention = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+     if(failedRetention < failedThreshold) {
+       LOG.warn("Invalid configuration " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+         "=" + failedRetention + " < " + ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+         failedRetention + ".  Will use " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+         "=" + failedRetention);
+       failedRetention = failedThreshold;
+     }
+     return failedRetention;
+   }
+   /**
+    * Returns {@code true} if there already exists sufficient number of consecutive failures for
+    * this table/partition so that no new automatic compactions will be scheduled.
+    * User initiated compactions don't do this check.
+    *
+    * Do we allow compacting whole table (when it's partitioned)?  No, though perhaps we should.
+    * That would be a meta operations, i.e. first find all partitions for this table (which have 
+    * txn info) and schedule each compaction separately.  This avoids complications in this logic.
+    */
+   @Override
+   @RetrySemantics.ReadOnly
+   public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
+     Connection dbConn = null;
+     PreparedStatement pStmt = null;
+     ResultSet rs = null;
+     try {
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " +
+           "CC_DATABASE = ? and " +
+           "CC_TABLE = ? " +
+           (ci.partName != null ? "and CC_PARTITION = ?" : "") +
+           " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
+         pStmt.setString(1, ci.dbname);
+         pStmt.setString(2, ci.tableName);
+         if (ci.partName != null) {
+           pStmt.setString(3, ci.partName);
+         }
+         rs = pStmt.executeQuery();
+         int numFailed = 0;
+         int numTotal = 0;
+         int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+         while(rs.next() && ++numTotal <= failedThreshold) {
+           if(rs.getString(1).charAt(0) == FAILED_STATE) {
+             numFailed++;
+           }
+           else {
+             numFailed--;
+           }
+         }
+         return numFailed == failedThreshold;
+       }
+       catch (SQLException e) {
+         LOG.error("Unable to check for failed compactions " + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+         LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+         return false;//weren't able to check
+       } finally {
+         close(rs, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       return checkFailedCompactions(ci);
+     }
+   }
+   /**
+    * If there is an entry in compaction_queue with ci.id, remove it
+    * Make entry in completed_compactions with status 'f'.
+    * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+    * which we record as ATTEMPTED_STATE entry in history.
+    */
+   @Override
+   @RetrySemantics.CannotRetry
+   public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
+     //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       PreparedStatement pStmt = null;
+       ResultSet rs = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+         pStmt.setLong(1, ci.id);
+         rs = pStmt.executeQuery();
+         if(rs.next()) {
+           ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+           String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+           pStmt = dbConn.prepareStatement(s);
+           pStmt.setLong(1, ci.id);
+           LOG.debug("Going to execute update <" + s + ">");
+           int updCnt = pStmt.executeUpdate();
+         }
+         else {
+           if(ci.id > 0) {
+             //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+             throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+           }
+         }
+         if(ci.id == 0) {
+           //The failure occurred before we even made an entry in COMPACTION_QUEUE
+           //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+           ci.id = generateCompactionQueueId(stmt);
+           //mostly this indicates that the Initiator is paying attention to some table even though
+           //compactions are not happening.
+           ci.state = ATTEMPTED_STATE;
+           //this is not strictly accurate, but 'type' cannot be null.
+           if(ci.type == null) { ci.type = CompactionType.MINOR; }
+           ci.start = getDbTime(dbConn);
+         }
+         else {
+           ci.state = FAILED_STATE;
+         }
+         close(rs, stmt, null);
+         closeStmt(pStmt);
+ 
+         pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
+         int updCount = pStmt.executeUpdate();
+         LOG.debug("Going to commit");
+         closeStmt(pStmt);
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         try {
+           checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+         }
+         catch(MetaException ex) {
+           LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+         }
+         LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
+       } finally {
+         close(rs, stmt, null);
+         close(null, pStmt, dbConn);
+       }
+     } catch (RetryException e) {
+       markFailed(ci);
+     }
+   }
+   @Override
+   @RetrySemantics.Idempotent
+   public void setHadoopJobId(String hadoopJobId, long id) {
+     try {
+       Connection dbConn = null;
+       Statement stmt = null;
+       try {
+         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         stmt = dbConn.createStatement();
+         String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+         LOG.debug("Going to execute <" + s + ">");
+         int updateCount = stmt.executeUpdate(s);
+         LOG.debug("Going to commit");
+         closeStmt(stmt);
+         dbConn.commit();
+       } catch (SQLException e) {
+         LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+         LOG.debug("Going to rollback");
+         rollbackDBConn(dbConn);
+         try {
+           checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+         }
+         catch(MetaException ex) {
+           LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+         }
+         LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+       } finally {
+         close(null, stmt, dbConn);
+       }
+     } catch (RetryException e) {
+       setHadoopJobId(hadoopJobId, id);
+     }
+   }
+ }
+ 
+