You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/25 18:27:27 UTC
[18/50] [abbrv] hive git commit: HIVE-19416 : merge master into
branch (Sergey Shelukhin) 0719
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
index 0000000,4a97f89..267c9e8
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MPartition.java
@@@ -1,0 -1,155 +1,162 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.hadoop.hive.metastore.model;
+
+ import java.util.List;
+ import java.util.Map;
+
+ public class MPartition {
+
+ private String partitionName; // partitionname ==> (key=value/)*(key=value)
+ private MTable table;
+ private List<String> values;
+ private int createTime;
+ private int lastAccessTime;
+ private MStorageDescriptor sd;
+ private Map<String, String> parameters;
-
++ private long writeId;
+
+ public MPartition() {}
+
+ /**
+ * @param partitionName
+ * @param table
+ * @param values
+ * @param createTime
+ * @param lastAccessTime
+ * @param sd
+ * @param parameters
+ */
+ public MPartition(String partitionName, MTable table, List<String> values, int createTime,
+ int lastAccessTime, MStorageDescriptor sd, Map<String, String> parameters) {
+ this.partitionName = partitionName;
+ this.table = table;
+ this.values = values;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.sd = sd;
+ this.parameters = parameters;
+ }
+
+ /**
+ * @return the lastAccessTime
+ */
+ public int getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ /**
+ * @param lastAccessTime the lastAccessTime to set
+ */
+ public void setLastAccessTime(int lastAccessTime) {
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ /**
+ * @return the values
+ */
+ public List<String> getValues() {
+ return values;
+ }
+
+ /**
+ * @param values the values to set
+ */
+ public void setValues(List<String> values) {
+ this.values = values;
+ }
+
+ /**
+ * @return the table
+ */
+ public MTable getTable() {
+ return table;
+ }
+
+ /**
+ * @param table the table to set
+ */
+ public void setTable(MTable table) {
+ this.table = table;
+ }
+
+ /**
+ * @return the sd
+ */
+ public MStorageDescriptor getSd() {
+ return sd;
+ }
+
+ /**
+ * @param sd the sd to set
+ */
+ public void setSd(MStorageDescriptor sd) {
+ this.sd = sd;
+ }
+
+ /**
+ * @return the parameters
+ */
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ /**
+ * @param parameters the parameters to set
+ */
+ public void setParameters(Map<String, String> parameters) {
+ this.parameters = parameters;
+ }
+
+ /**
+ * @return the partitionName
+ */
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ /**
+ * @param partitionName the partitionName to set
+ */
+ public void setPartitionName(String partitionName) {
+ this.partitionName = partitionName;
+ }
+
+ /**
+ * @return the createTime
+ */
+ public int getCreateTime() {
+ return createTime;
+ }
+
+ /**
+ * @param createTime the createTime to set
+ */
+ public void setCreateTime(int createTime) {
+ this.createTime = createTime;
+ }
+
++ public long getWriteId() {
++ return writeId;
++ }
++
++ public void setWriteId(long writeId) {
++ this.writeId = writeId;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
index 0000000,38ad479..deeb971
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
@@@ -1,0 -1,273 +1,283 @@@
++
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.hadoop.hive.metastore.model;
+
+ import java.util.List;
+ import java.util.Map;
+
+ public class MTable {
+
+ private String tableName;
+ private MDatabase database;
+ private MStorageDescriptor sd;
+ private String owner;
+ private String ownerType;
+ private int createTime;
+ private int lastAccessTime;
+ private int retention;
+ private List<MFieldSchema> partitionKeys;
+ private Map<String, String> parameters;
+ private String viewOriginalText;
+ private String viewExpandedText;
+ private boolean rewriteEnabled;
+ private String tableType;
++ private long writeId;
+
+ public MTable() {}
+
+ /**
+ * @param tableName
+ * @param database
+ * @param sd
+ * @param owner
+ * @param ownerType
+ * @param createTime
+ * @param lastAccessTime
+ * @param retention
+ * @param partitionKeys
+ * @param parameters
+ * @param viewOriginalText
+ * @param viewExpandedText
+ * @param tableType
+ */
+ public MTable(String tableName, MDatabase database, MStorageDescriptor sd, String owner, String ownerType,
+ int createTime, int lastAccessTime, int retention, List<MFieldSchema> partitionKeys,
+ Map<String, String> parameters, String viewOriginalText, String viewExpandedText,
+ boolean rewriteEnabled, String tableType) {
+ this.tableName = tableName;
+ this.database = database;
+ this.sd = sd;
+ this.owner = owner;
+ this.ownerType = ownerType;
+ this.createTime = createTime;
+ this.setLastAccessTime(lastAccessTime);
+ this.retention = retention;
+ this.partitionKeys = partitionKeys;
+ this.parameters = parameters;
+ this.viewOriginalText = viewOriginalText;
+ this.viewExpandedText = viewExpandedText;
+ this.rewriteEnabled = rewriteEnabled;
+ this.tableType = tableType;
+ }
+
+ /**
+ * @return the tableName
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @param tableName the tableName to set
+ */
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * @return the sd
+ */
+ public MStorageDescriptor getSd() {
+ return sd;
+ }
+
+ /**
+ * @param sd the sd to set
+ */
+ public void setSd(MStorageDescriptor sd) {
+ this.sd = sd;
+ }
+
+ /**
+ * @return the partKeys
+ */
+ public List<MFieldSchema> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ /**
+ * @param partKeys the partKeys to set
+ */
+ public void setPartitionKeys(List<MFieldSchema> partKeys) {
+ this.partitionKeys = partKeys;
+ }
+
+ /**
+ * @return the parameters
+ */
+ public Map<String, String> getParameters() {
+ return parameters;
+ }
+
+ /**
+ * @param parameters the parameters to set
+ */
+ public void setParameters(Map<String, String> parameters) {
+ this.parameters = parameters;
+ }
+
+ /**
+ * @return the original view text, or null if this table is not a view
+ */
+ public String getViewOriginalText() {
+ return viewOriginalText;
+ }
+
+ /**
+ * @param viewOriginalText the original view text to set
+ */
+ public void setViewOriginalText(String viewOriginalText) {
+ this.viewOriginalText = viewOriginalText;
+ }
+
+ /**
+ * @return the expanded view text, or null if this table is not a view
+ */
+ public String getViewExpandedText() {
+ return viewExpandedText;
+ }
+
+ /**
+ * @param viewExpandedText the expanded view text to set
+ */
+ public void setViewExpandedText(String viewExpandedText) {
+ this.viewExpandedText = viewExpandedText;
+ }
+
+ /**
+ * @return whether the view can be used for rewriting queries
+ */
+ public boolean isRewriteEnabled() {
+ return rewriteEnabled;
+ }
+
+ /**
+ * @param rewriteEnabled whether the view can be used for rewriting queries
+ */
+ public void setRewriteEnabled(boolean rewriteEnabled) {
+ this.rewriteEnabled = rewriteEnabled;
+ }
+
+ /**
+ * @return the owner
+ */
+ public String getOwner() {
+ return owner;
+ }
+
+ /**
+ * @param owner the owner to set
+ */
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ /**
+ * @return the owner type
+ */
+ public String getOwnerType() {
+ return ownerType;
+ }
+
+ /**
+ * @param ownerType the owner type to set
+ */
+ public void setOwnerType(String ownerType) {
+ this.ownerType = ownerType;
+ }
+
+ /**
+ * @return the createTime
+ */
+ public int getCreateTime() {
+ return createTime;
+ }
+
+ /**
+ * @param createTime the createTime to set
+ */
+ public void setCreateTime(int createTime) {
+ this.createTime = createTime;
+ }
+
+ /**
+ * @return the database
+ */
+ public MDatabase getDatabase() {
+ return database;
+ }
+
+ /**
+ * @param database the database to set
+ */
+ public void setDatabase(MDatabase database) {
+ this.database = database;
+ }
+
+ /**
+ * @return the retention
+ */
+ public int getRetention() {
+ return retention;
+ }
+
+ /**
+ * @param retention the retention to set
+ */
+ public void setRetention(int retention) {
+ this.retention = retention;
+ }
+
+ /**
+ * @param lastAccessTime the lastAccessTime to set
+ */
+ public void setLastAccessTime(int lastAccessTime) {
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ /**
+ * @return the lastAccessTime
+ */
+ public int getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ /**
+ * @param tableType the tableType to set
+ */
+ public void setTableType(String tableType) {
+ this.tableType = tableType;
+ }
+
+ /**
+ * @return the tableType
+ */
+ public String getTableType() {
+ return tableType;
+ }
++
++ public long getWriteId() {
++ return writeId;
++ }
++
++ public void setWriteId(long writeId) {
++ this.writeId = writeId;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 0000000,4e3068d..1f559e9
mode 000000,100644..100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@@ -1,0 -1,1107 +1,1158 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hive.metastore.txn;
+
++import org.apache.hadoop.hive.common.StatsSetupConst;
+ import org.apache.hadoop.hive.common.classification.RetrySemantics;
+ import org.apache.hadoop.hive.metastore.api.CompactionType;
+ import org.apache.hadoop.hive.metastore.api.MetaException;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+ import org.apache.hadoop.util.StringUtils;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import java.sql.Connection;
+ import java.sql.PreparedStatement;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+
+ /**
+ * Extends the transaction handler with methods needed only by the compactor threads. These
+ * methods are not available through the thrift interface.
+ */
+ class CompactionTxnHandler extends TxnHandler {
+ static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
+ static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ public CompactionTxnHandler() {
+ }
+
+ /**
+ * This will look through the completed_txn_components table and look for partitions or tables
+ * that may be ready for compaction. Also, look through txns and txn_components tables for
+ * aborted transactions that we should add to the list.
+ * @param maxAborted Maximum number of aborted queries to allow before marking this as a
+ * potential compaction.
+ * @return list of CompactionInfo structs. These will not have id, type,
+ * or runAs set since these are only potential compactions not actual ones.
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+ Connection dbConn = null;
+ Set<CompactionInfo> response = new HashSet<>();
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ // Check for completed transactions
+ String s = "select distinct ctc_database, ctc_table, " +
+ "ctc_partition from COMPLETED_TXN_COMPONENTS";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ response.add(info);
+ }
+ rs.close();
+
+ // Check for aborted txns
+ s = "select tc_database, tc_table, tc_partition " +
+ "from TXNS, TXN_COMPONENTS " +
+ "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
+ "group by tc_database, tc_table, tc_partition " +
+ "having count(*) > " + maxAborted;
+
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ info.tooManyAborts = true;
+ response.add(info);
+ }
+
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database " + e.getMessage());
+ checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ return response;
+ }
+ catch (RetryException e) {
+ return findPotentialCompactions(maxAborted);
+ }
+ }
+
+ /**
+ * Sets the user to run as. This is for the case
+ * where the request was generated by the user and so the worker must set this value later.
+ * @param cq_id id of this entry in the queue
+ * @param user user to run the jobs as
+ */
+ @Override
+ @RetrySemantics.Idempotent
+ public void setRunAs(long cq_id, String user) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCnt = stmt.executeUpdate(s);
+ if (updCnt != 1) {
+ LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ". updCnt=" + updCnt);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")");
+ } finally {
+ closeDbConn(dbConn);
+ closeStmt(stmt);
+ }
+ } catch (RetryException e) {
+ setRunAs(cq_id, user);
+ }
+ }
+
+ /**
+ * This will grab the next compaction request off of
+ * the queue, and assign it to the worker.
+ * @param workerId id of the worker calling this, will be recorded in the db
+ * @return an info element for this compaction request, or null if there is no work to do now.
+ */
+ @Override
+ @RetrySemantics.SafeToRetry
+ public CompactionInfo findNextToCompact(String workerId) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
+ Statement updStmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_id, cq_database, cq_table, cq_partition, " +
+ "cq_type, cq_tblproperties from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.debug("No compactions found ready to compact");
+ dbConn.rollback();
+ return null;
+ }
+ updStmt = dbConn.createStatement();
+ do {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
+ info.properties = rs.getString(6);
+ // Now, update this record as being worked on by this worker.
+ long now = getDbTime(dbConn);
+ s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
+ "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id +
+ " AND cq_state='" + INITIATED_STATE + "'";
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCount = updStmt.executeUpdate(s);
+ if(updCount == 1) {
+ dbConn.commit();
+ return info;
+ }
+ if(updCount == 0) {
+ LOG.debug("Another Worker picked up " + info);
+ continue;
+ }
+ LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
+ info + ". updCnt=" + updCount + ".");
+ dbConn.rollback();
+ return null;
+ } while( rs.next());
+ dbConn.rollback();
+ return null;
+ } catch (SQLException e) {
+ LOG.error("Unable to select next element for compaction, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(updStmt);
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return findNextToCompact(workerId);
+ }
+ }
+
+ /**
+ * This will mark an entry in the queue as compacted
+ * and put it in the ready to clean state.
+ * @param info info on the compaction entry to mark as compacted.
+ */
+ @Override
+ @RetrySemantics.SafeToRetry
+ public void markCompacted(CompactionInfo info) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
+ "cq_worker_id = null where cq_id = " + info.id;
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCnt = stmt.executeUpdate(s);
+ if (updCnt != 1) {
+ LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCompacted(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ markCompacted(info);
+ }
+ }
+
+ /**
+ * Find entries in the queue that are ready to
+ * be cleaned.
+ * @return information on the entry in the queue.
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public List<CompactionInfo> findReadyToClean() throws MetaException {
+ Connection dbConn = null;
+ List<CompactionInfo> rc = new ArrayList<>();
+
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select cq_id, cq_database, cq_table, cq_partition, "
+ + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '"
+ + READY_FOR_CLEANING + "'";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ while (rs.next()) {
+ CompactionInfo info = new CompactionInfo();
+ info.id = rs.getLong(1);
+ info.dbname = rs.getString(2);
+ info.tableName = rs.getString(3);
+ info.partName = rs.getString(4);
+ switch (rs.getString(5).charAt(0)) {
+ case MAJOR_TYPE: info.type = CompactionType.MAJOR; break;
+ case MINOR_TYPE: info.type = CompactionType.MINOR; break;
+ default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
+ }
+ info.runAs = rs.getString(6);
+ info.highestWriteId = rs.getLong(7);
+ rc.add(info);
+ }
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ return rc;
+ } catch (SQLException e) {
+ LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findReadyToClean");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return findReadyToClean();
+ }
+ }
+
+ /**
+ * This will remove an entry from the queue after
+ * it has been compacted.
+ *
+ * @param info info on the compaction entry to remove
+ */
+ @Override
+ @RetrySemantics.CannotRetry
+ public void markCleaned(CompactionInfo info) throws MetaException {
+ try {
+ Connection dbConn = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt.setLong(1, info.id);
+ rs = pStmt.executeQuery();
+ if(rs.next()) {
+ info = CompactionInfo.loadFullFromCompactionQueue(rs);
+ }
+ else {
+ throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+ }
+ close(rs);
+ String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, info.id);
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+ info.state = SUCCEEDED_STATE;
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
+ updCount = pStmt.executeUpdate();
+
+ // Remove entries from completed_txn_components as well, so we don't start looking there
+ // again but only up to the highest write ID include in this compaction job.
+ //highestWriteId will be NULL in upgrade scenarios
+ s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
+ "ctc_table = ?";
+ if (info.partName != null) {
+ s += " and ctc_partition = ?";
+ }
+ if(info.highestWriteId != 0) {
+ s += " and ctc_writeid <= ?";
+ }
+ pStmt = dbConn.prepareStatement(s);
+ int paramCount = 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+ if(info.highestWriteId != 0) {
+ pStmt.setLong(paramCount++, info.highestWriteId);
+ }
+ LOG.debug("Going to execute update <" + s + ">");
+ if (pStmt.executeUpdate() < 1) {
+ LOG.error("Expected to remove at least one row from completed_txn_components when " +
+ "marking compaction entry as clean!");
+ }
+
+ s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
+ TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
+ if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
+ if (info.partName != null) s += " and tc_partition = ?";
+
+ pStmt = dbConn.prepareStatement(s);
+ paramCount = 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if(info.highestWriteId != 0) {
+ pStmt.setLong(paramCount++, info.highestWriteId);
+ }
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+
+ LOG.debug("Going to execute update <" + s + ">");
+ rs = pStmt.executeQuery();
+ List<Long> txnids = new ArrayList<>();
+ List<String> questions = new ArrayList<>();
+ while (rs.next()) {
+ long id = rs.getLong(1);
+ txnids.add(id);
+ questions.add("?");
+ }
+ // Remove entries from txn_components, as there may be aborted txn components
+ if (txnids.size() > 0) {
+ List<String> queries = new ArrayList<>();
+
+ // Prepare prefix and suffix
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ prefix.append("delete from TXN_COMPONENTS where ");
+
+ //because 1 txn may include different partitions/tables even in auto commit mode
+ suffix.append(" and tc_database = ?");
+ suffix.append(" and tc_table = ?");
+ if (info.partName != null) {
+ suffix.append(" and tc_partition = ?");
+ }
+
+ // Populate the complete query with provided prefix and suffix
+ List<Integer> counts = TxnUtils
+ .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid",
+ true, false);
+ int totalCount = 0;
+ for (int i = 0; i < queries.size(); i++) {
+ String query = queries.get(i);
+ int insertCount = counts.get(i);
+
+ LOG.debug("Going to execute update <" + query + ">");
+ pStmt = dbConn.prepareStatement(query);
+ for (int j = 0; j < insertCount; j++) {
+ pStmt.setLong(j + 1, txnids.get(totalCount + j));
+ }
+ totalCount += insertCount;
+ paramCount = insertCount + 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+ int rc = pStmt.executeUpdate();
+ LOG.debug("Removed " + rc + " records from txn_components");
+
+ // Don't bother cleaning from the txns table. A separate call will do that. We don't
+ // know here which txns still have components from other tables or partitions in the
+ // table, so we don't know which ones we can and cannot clean.
+ }
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "markCleaned(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, pStmt, dbConn);
+ }
+ } catch (RetryException e) {
+ markCleaned(info);
+ }
+ }
+
+ /**
+ * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
+ * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+ */
+ @Override
+ @RetrySemantics.SafeToRetry
+ public void cleanTxnToWriteIdTable() throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+
+ try {
+ // We query for minimum values in all the queries and they can only increase by any concurrent
+ // operations. So, READ COMMITTED is sufficient.
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ // First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
+ // If there are no txns which are currently open or aborted in the system, then current value of
+ // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
+ String s = "select ntxn_next from NEXT_TXN_ID";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction tables not properly " +
+ "initialized, no record found in next_txn_id");
+ }
+ long minUncommittedTxnId = rs.getLong(1);
+
+ // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table
+ // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next.
+ s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (rs.next()) {
+ long minOpenTxnId = rs.getLong(1);
+ if (minOpenTxnId > 0) {
+ minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId);
+ }
+ }
+
+ // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid
+ // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid).
+ s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED);
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (rs.next()) {
+ long minAbortedTxnId = rs.getLong(1);
+ if (minAbortedTxnId > 0) {
+ minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId);
+ }
+ }
+
+ // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
+ // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
+ s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId;
+ LOG.debug("Going to execute delete <" + s + ">");
+ int rc = stmt.executeUpdate(s);
+ LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId);
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from txns table " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanTxnToWriteIdTable");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ cleanTxnToWriteIdTable();
+ }
+ }
+
+ /**
+ * Clean up aborted transactions from txns that have no components in txn_components. The reason such
+ * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ */
+ @Override
+ @RetrySemantics.SafeToRetry
+ public void cleanEmptyAbortedTxns() throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ //Aborted is a terminal state, so nothing about the txn can change
+ //after that, so READ COMMITTED is sufficient.
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select txn_id from TXNS where " +
- "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
- "txn_state = '" + TXN_ABORTED + "'";
++ "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " +
++ "txn_state = '" + TXN_ABORTED + "'";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ List<Long> txnids = new ArrayList<>();
+ while (rs.next()) txnids.add(rs.getLong(1));
+ close(rs);
+ if(txnids.size() <= 0) {
+ return;
+ }
+ Collections.sort(txnids);//easier to read logs
++
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
++ // Turn off COLUMN_STATS_ACCURATE for txnids' components in TBLS and PARTITIONS
++ prefix.append("select tbl_id from TBLS inner join DBS on TBLS.DB_ID = DBS.DB_ID "
++ + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME"
++ + " and t2w_writeid = TBLS.WRITE_ID where ");
++ suffix.append("");
++ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false);
++
++ // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from TABLE_PARAMS for the txnids.
++ List<StringBuilder> finalCommands = new ArrayList<>(queries.size());
++ for (int i = 0; i < queries.size(); i++) {
++ String query = queries.get(i);
++ finalCommands.add(i, new StringBuilder("delete from TABLE_PARAMS " +
++ " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and tbl_id in ("));
++ finalCommands.get(i).append(query + ")");
++ LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++ int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++ LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from TBLS");
++ }
++
++ queries.clear();
++ prefix.setLength(0);
++ suffix.setLength(0);
++ finalCommands.clear();
++
++ // Delete COLUMN_STATS_ACCURATE.BASIC_STATS rows from PARTITIONS_PARAMS for the txnids.
++ prefix.append("select part_id from PARTITIONS "
++ + "inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
++ + "inner join DBS on TBLS.DB_ID = DBS.DB_ID "
++ + "inner join TXN_TO_WRITE_ID on t2w_database = DBS.NAME and t2w_table = TBLS.TBL_NAME"
++ + " and t2w_writeid = TBLS.WRITE_ID where ");
++ suffix.append("");
++ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "t2w_txnid", true, false);
++
++ for (int i = 0; i < queries.size(); i++) {
++ String query = queries.get(i);
++ finalCommands.add(i, new StringBuilder("delete from PARTITION_PARAMS " +
++ " where param_key = '" + "COLUMN_STATS_ACCURATE" + "' and part_id in ("));
++ finalCommands.get(i).append(query + ")");
++ LOG.debug("Going to execute update <" + finalCommands.get(i) + ">");
++ int rc = stmt.executeUpdate(finalCommands.get(i).toString());
++ LOG.info("Turned off " + rc + " COLUMN_STATE_ACCURATE.BASIC_STATS states from PARTITIONS");
++ }
++
++ queries.clear();
++ prefix.setLength(0);
++ suffix.setLength(0);
++ finalCommands.clear();
++
++ // Delete from TXNS.
+ prefix.append("delete from TXNS where ");
+ suffix.append("");
+
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "txn_id", false, false);
+
+ for (String query : queries) {
+ LOG.debug("Going to execute update <" + query + ">");
+ int rc = stmt.executeUpdate(query);
+ LOG.info("Removed " + rc + " empty Aborted transactions from TXNS");
+ }
+ LOG.info("Aborted transactions removed from TXNS: " + txnids);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from txns table " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "cleanEmptyAbortedTxns");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ cleanEmptyAbortedTxns();
+ }
+ }
+
+ /**
+ * This will take all entries assigned to workers
+ * on a host return them to INITIATED state. The initiator should use this at start up to
+ * clean entries from any workers that were in the middle of compacting when the metastore
+ * shutdown. It does not reset entries from worker threads on other hosts as those may still
+ * be working.
+ * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
+ * so that like hostname% will match the worker id.
+ */
+ @Override
+ @RetrySemantics.Idempotent
+ public void revokeFromLocalWorkers(String hostname) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '"
+ + hostname + "%'";
+ LOG.debug("Going to execute update <" + s + ">");
+ // It isn't an error if the following returns no rows, as the local workers could have died
+ // with nothing assigned to them.
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ revokeFromLocalWorkers(hostname);
+ }
+ }
+
+ /**
+ * This call will return all compaction queue
+ * entries assigned to a worker but over the timeout back to the initiated state.
+ * This should be called by the initiator on start up and occasionally when running to clean up
+ * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
+ * first.
+ * @param timeout number of milliseconds since start time that should elapse before a worker is
+ * declared dead.
+ */
+ @Override
+ @RetrySemantics.Idempotent
+ public void revokeTimedoutWorkers(long timeout) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ long latestValidStart = getDbTime(dbConn) - timeout;
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '"
+ + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < "
+ + latestValidStart;
+ LOG.debug("Going to execute update <" + s + ">");
+ // It isn't an error if the following returns no rows, as the local workers could have died
+ // with nothing assigned to them.
+ stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to change dead worker's records back to initiated state " +
+ e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ closeDbConn(dbConn);
+ }
+ } catch (RetryException e) {
+ revokeTimedoutWorkers(timeout);
+ }
+ }
+
+ /**
+ * Queries metastore DB directly to find columns in the table which have statistics information.
+ * If {@code ci} includes partition info then per partition stats info is examined, otherwise
+ * table level stats are examined.
+ * @throws MetaException
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
+ Connection dbConn = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ String quote = getIdentifierQuoteString(dbConn);
+ StringBuilder bldr = new StringBuilder();
+ bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
+ .append(" FROM ")
+ .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
+ .append(quote)
+ .append(" WHERE ")
+ .append(quote).append("DB_NAME").append(quote).append(" = ?")
+ .append(" AND ").append(quote).append("TABLE_NAME").append(quote)
+ .append(" = ?");
+ if (ci.partName != null) {
+ bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?");
+ }
+ String s = bldr.toString();
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setString(1, ci.dbname);
+ pStmt.setString(2, ci.tableName);
+ if (ci.partName != null) {
+ pStmt.setString(3, ci.partName);
+ }
+
+ /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
+ "PART_COL_STATS")
+ + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+ + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
+ LOG.debug("Going to execute <" + s + ">");
+ rs = pStmt.executeQuery();
+ List<String> columns = new ArrayList<>();
+ while (rs.next()) {
+ columns.add(rs.getString(1));
+ }
+ LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName));
+ dbConn.commit();
+ return columns;
+ } catch (SQLException e) {
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName +
+ (ci.partName == null ? "" : "/" + ci.partName) + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, pStmt, dbConn);
+ }
+ } catch (RetryException ex) {
+ return findColumnsWithStats(ci);
+ }
+ }
+
+ /**
+ * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ * This is the highest resolved txn id, i.e. such that there are no open txns with lower ids.
+ */
+ @Override
+ @RetrySemantics.Idempotent
+ public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId +
+ " WHERE CQ_ID = " + ci.id);
+ if(updCount != 1) {
+ throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci);
+ }
+ dbConn.commit();
+ } catch (SQLException e) {
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(null, stmt, dbConn);
+ }
+ } catch (RetryException ex) {
+ setCompactionHighestWriteId(ci, highestWriteId);
+ }
+ }
+ private static class RetentionCounters {
+ int attemptedRetention = 0;
+ int failedRetention = 0;
+ int succeededRetention = 0;
+ RetentionCounters(int attemptedRetention, int failedRetention, int succeededRetention) {
+ this.attemptedRetention = attemptedRetention;
+ this.failedRetention = failedRetention;
+ this.succeededRetention = succeededRetention;
+ }
+ }
+ private void checkForDeletion(List<Long> deleteSet, CompactionInfo ci, RetentionCounters rc) {
+ switch (ci.state) {
+ case ATTEMPTED_STATE:
+ if(--rc.attemptedRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ case FAILED_STATE:
+ if(--rc.failedRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ case SUCCEEDED_STATE:
+ if(--rc.succeededRetention < 0) {
+ deleteSet.add(ci.id);
+ }
+ break;
+ default:
+ //do nothing to hanlde future RU/D where we may want to add new state types
+ }
+ }
+
+ /**
+ * For any given compactable entity (partition; table if not partitioned) the history of compactions
+ * may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
+ * history such that a configurable number of each type of state is present. Any other entries
+ * can be purged. This scheme has advantage of always retaining the last failure/success even if
+ * it's not recent.
+ * @throws MetaException
+ */
+ @Override
+ @RetrySemantics.SafeToRetry
+ public void purgeCompactionHistory() throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ List<Long> deleteSet = new ArrayList<>();
+ RetentionCounters rc = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ /*cc_id is monotonically increasing so for any entity sorts in order of compaction history,
+ thus this query groups by entity and withing group sorts most recent first*/
+ rs = stmt.executeQuery("select cc_id, cc_database, cc_table, cc_partition, cc_state from " +
+ "COMPLETED_COMPACTIONS order by cc_database, cc_table, cc_partition, cc_id desc");
+ String lastCompactedEntity = null;
+ /*In each group, walk from most recent and count occurences of each state type. Once you
+ * have counted enough (for each state) to satisfy retention policy, delete all other
+ * instances of this status.*/
+ while(rs.next()) {
+ CompactionInfo ci = new CompactionInfo(rs.getLong(1), rs.getString(2), rs.getString(3), rs.getString(4), rs.getString(5).charAt(0));
+ if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
+ lastCompactedEntity = ci.getFullPartitionName();
+ rc = new RetentionCounters(MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ getFailedCompactionRetention(),
+ MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+ }
+ checkForDeletion(deleteSet, ci, rc);
+ }
+ close(rs);
+
+ if (deleteSet.size() <= 0) {
+ return;
+ }
+
+ List<String> queries = new ArrayList<>();
+
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ prefix.append("delete from COMPLETED_COMPACTIONS where ");
+ suffix.append("");
+
+ List<String> questions = new ArrayList<>(deleteSet.size());
+ for (int i = 0; i < deleteSet.size(); i++) {
+ questions.add("?");
+ }
+ List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false);
+ int totalCount = 0;
+ for (int i = 0; i < queries.size(); i++) {
+ String query = queries.get(i);
+ long insertCount = counts.get(i);
+ LOG.debug("Going to execute update <" + query + ">");
+ pStmt = dbConn.prepareStatement(query);
+ for (int j = 0; j < insertCount; j++) {
+ pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
+ }
+ totalCount += insertCount;
+ int count = pStmt.executeUpdate();
+ LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS");
+ }
+ dbConn.commit();
+ } catch (SQLException e) {
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "purgeCompactionHistory()");
+ throw new MetaException("Unable to connect to transaction database " +
+ StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ closeStmt(pStmt);
+ }
+ } catch (RetryException ex) {
+ purgeCompactionHistory();
+ }
+ }
+ /**
+ * this ensures that the number of failed compaction entries retained is > than number of failed
+ * compaction threshold which prevents new compactions from being scheduled.
+ */
+ private int getFailedCompactionRetention() {
+ int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+ int failedRetention = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
+ if(failedRetention < failedThreshold) {
+ LOG.warn("Invalid configuration " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+ "=" + failedRetention + " < " + ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED + "=" +
+ failedRetention + ". Will use " + ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname() +
+ "=" + failedRetention);
+ failedRetention = failedThreshold;
+ }
+ return failedRetention;
+ }
+ /**
+ * Returns {@code true} if there already exists sufficient number of consecutive failures for
+ * this table/partition so that no new automatic compactions will be scheduled.
+ * User initiated compactions don't do this check.
+ *
+ * Do we allow compacting whole table (when it's partitioned)? No, though perhaps we should.
+ * That would be a meta operations, i.e. first find all partitions for this table (which have
+ * txn info) and schedule each compaction separately. This avoids complications in this logic.
+ */
+ @Override
+ @RetrySemantics.ReadOnly
+ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
+ Connection dbConn = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " +
+ "CC_DATABASE = ? and " +
+ "CC_TABLE = ? " +
+ (ci.partName != null ? "and CC_PARTITION = ?" : "") +
+ " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
+ pStmt.setString(1, ci.dbname);
+ pStmt.setString(2, ci.tableName);
+ if (ci.partName != null) {
+ pStmt.setString(3, ci.partName);
+ }
+ rs = pStmt.executeQuery();
+ int numFailed = 0;
+ int numTotal = 0;
+ int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+ while(rs.next() && ++numTotal <= failedThreshold) {
+ if(rs.getString(1).charAt(0) == FAILED_STATE) {
+ numFailed++;
+ }
+ else {
+ numFailed--;
+ }
+ }
+ return numFailed == failedThreshold;
+ }
+ catch (SQLException e) {
+ LOG.error("Unable to check for failed compactions " + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
+ return false;//weren't able to check
+ } finally {
+ close(rs, pStmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return checkFailedCompactions(ci);
+ }
+ }
+ /**
+ * If there is an entry in compaction_queue with ci.id, remove it
+ * Make entry in completed_compactions with status 'f'.
+ * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+ * which we record as ATTEMPTED_STATE entry in history.
+ */
+ @Override
+ @RetrySemantics.CannotRetry
+ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
+ //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt.setLong(1, ci.id);
+ rs = pStmt.executeQuery();
+ if(rs.next()) {
+ ci = CompactionInfo.loadFullFromCompactionQueue(rs);
+ String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, ci.id);
+ LOG.debug("Going to execute update <" + s + ">");
+ int updCnt = pStmt.executeUpdate();
+ }
+ else {
+ if(ci.id > 0) {
+ //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+ throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ }
+ }
+ if(ci.id == 0) {
+ //The failure occurred before we even made an entry in COMPACTION_QUEUE
+ //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+ ci.id = generateCompactionQueueId(stmt);
+ //mostly this indicates that the Initiator is paying attention to some table even though
+ //compactions are not happening.
+ ci.state = ATTEMPTED_STATE;
+ //this is not strictly accurate, but 'type' cannot be null.
+ if(ci.type == null) { ci.type = CompactionType.MINOR; }
+ ci.start = getDbTime(dbConn);
+ }
+ else {
+ ci.state = FAILED_STATE;
+ }
+ close(rs, stmt, null);
+ closeStmt(pStmt);
+
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ LOG.debug("Going to commit");
+ closeStmt(pStmt);
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.warn("markFailed(" + ci.id + "):" + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ try {
+ checkRetryable(dbConn, e, "markFailed(" + ci + ")");
+ }
+ catch(MetaException ex) {
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+ }
+ LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e);
+ } finally {
+ close(rs, stmt, null);
+ close(null, pStmt, dbConn);
+ }
+ } catch (RetryException e) {
+ markFailed(ci);
+ }
+ }
+ @Override
+ @RetrySemantics.Idempotent
+ public void setHadoopJobId(String hadoopJobId, long id) {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "update COMPACTION_QUEUE set CQ_HADOOP_JOB_ID = " + quoteString(hadoopJobId) + " WHERE CQ_ID = " + id;
+ LOG.debug("Going to execute <" + s + ">");
+ int updateCount = stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ closeStmt(stmt);
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ try {
+ checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")");
+ }
+ catch(MetaException ex) {
+ LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex));
+ }
+ LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e);
+ } finally {
+ close(null, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ setHadoopJobId(hadoopJobId, id);
+ }
+ }
+ }
+
+