You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/02/23 16:31:00 UTC
[02/21] hive git commit: HIVE-18192: Introduce WriteID per table
rather than using global transaction ID (Sankar Hariappan,
reviewed by Eugene Koifman)
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index a90b7d4..ba006cf 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -286,8 +286,9 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "select cq_id, cq_database, cq_table, cq_partition, " +
- "cq_type, cq_run_as, cq_highest_txn_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'";
+ String s = "select cq_id, cq_database, cq_table, cq_partition, "
+ + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '"
+ + READY_FOR_CLEANING + "'";
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
while (rs.next()) {
@@ -302,7 +303,7 @@ class CompactionTxnHandler extends TxnHandler {
default: throw new MetaException("Unexpected compaction type " + rs.getString(5));
}
info.runAs = rs.getString(6);
- info.highestTxnId = rs.getLong(7);
+ info.highestWriteId = rs.getLong(7);
rc.add(info);
}
LOG.debug("Going to rollback");
@@ -338,7 +339,7 @@ class CompactionTxnHandler extends TxnHandler {
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
pStmt.setLong(1, info.id);
rs = pStmt.executeQuery();
if(rs.next()) {
@@ -358,21 +359,21 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to rollback");
dbConn.rollback();
}
- pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
info.state = SUCCEEDED_STATE;
CompactionInfo.insertIntoCompletedCompactions(pStmt, info, getDbTime(dbConn));
updCount = pStmt.executeUpdate();
// Remove entries from completed_txn_components as well, so we don't start looking there
- // again but only up to the highest txn ID include in this compaction job.
- //highestTxnId will be NULL in upgrade scenarios
+ // again but only up to the highest write ID include in this compaction job.
+ //highestWriteId will be NULL in upgrade scenarios
s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
"ctc_table = ?";
if (info.partName != null) {
s += " and ctc_partition = ?";
}
- if(info.highestTxnId != 0) {
- s += " and ctc_txnid <= ?";
+ if(info.highestWriteId != 0) {
+ s += " and ctc_writeid <= ?";
}
pStmt = dbConn.prepareStatement(s);
int paramCount = 1;
@@ -381,8 +382,8 @@ class CompactionTxnHandler extends TxnHandler {
if (info.partName != null) {
pStmt.setString(paramCount++, info.partName);
}
- if(info.highestTxnId != 0) {
- pStmt.setLong(paramCount++, info.highestTxnId);
+ if(info.highestWriteId != 0) {
+ pStmt.setLong(paramCount++, info.highestWriteId);
}
LOG.debug("Going to execute update <" + s + ">");
if (pStmt.executeUpdate() < 1) {
@@ -392,15 +393,15 @@ class CompactionTxnHandler extends TxnHandler {
s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
- if (info.highestTxnId != 0) s += " and txn_id <= ?";
+ if (info.highestWriteId != 0) s += " and tc_writeid <= ?";
if (info.partName != null) s += " and tc_partition = ?";
pStmt = dbConn.prepareStatement(s);
paramCount = 1;
pStmt.setString(paramCount++, info.dbname);
pStmt.setString(paramCount++, info.tableName);
- if(info.highestTxnId != 0) {
- pStmt.setLong(paramCount++, info.highestTxnId);
+ if(info.highestWriteId != 0) {
+ pStmt.setLong(paramCount++, info.highestWriteId);
}
if (info.partName != null) {
pStmt.setString(paramCount++, info.partName);
@@ -700,14 +701,14 @@ class CompactionTxnHandler extends TxnHandler {
*/
@Override
@RetrySemantics.Idempotent
- public void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException {
+ public void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException {
Connection dbConn = null;
Statement stmt = null;
try {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_TXN_ID = " + highestTxnId +
+ int updCount = stmt.executeUpdate("UPDATE COMPACTION_QUEUE SET CQ_HIGHEST_WRITE_ID = " + highestWriteId +
" WHERE CQ_ID = " + ci.id);
if(updCount != 1) {
throw new IllegalStateException("Could not find record in COMPACTION_QUEUE for " + ci);
@@ -715,14 +716,14 @@ class CompactionTxnHandler extends TxnHandler {
dbConn.commit();
} catch (SQLException e) {
rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "setCompactionHighestTxnId(" + ci + "," + highestTxnId + ")");
+ checkRetryable(dbConn, e, "setCompactionHighestWriteId(" + ci + "," + highestWriteId + ")");
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
close(null, stmt, dbConn);
}
} catch (RetryException ex) {
- setCompactionHighestTxnId(ci, highestTxnId);
+ setCompactionHighestWriteId(ci, highestWriteId);
}
}
private static class RetentionCounters {
@@ -932,7 +933,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_WRITE_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
pStmt.setLong(1, ci.id);
rs = pStmt.executeQuery();
if(rs.next()) {
@@ -966,7 +967,7 @@ class CompactionTxnHandler extends TxnHandler {
close(rs, stmt, null);
closeStmt(pStmt);
- pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
+ pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_WRITE_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
int updCount = pStmt.executeUpdate();
LOG.debug("Going to commit");
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index e724723..88f6346 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -86,16 +86,29 @@ public final class TxnDbUtil {
" TC_DATABASE varchar(128) NOT NULL," +
" TC_TABLE varchar(128)," +
" TC_PARTITION varchar(767)," +
- " TC_OPERATION_TYPE char(1) NOT NULL)");
+ " TC_OPERATION_TYPE char(1) NOT NULL," +
+ " TC_WRITEID bigint)");
stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
" CTC_TXNID bigint," +
" CTC_DATABASE varchar(128) NOT NULL," +
" CTC_TABLE varchar(128)," +
" CTC_PARTITION varchar(767)," +
" CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," +
- " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)");
+ " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," +
+ " CTC_WRITEID bigint)");
stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+
+ stmt.execute("CREATE TABLE TXN_TO_WRITE_ID (" +
+ " T2W_TXNID bigint NOT NULL," +
+ " T2W_DATABASE varchar(128) NOT NULL," +
+ " T2W_TABLE varchar(256) NOT NULL," +
+ " T2W_WRITEID bigint NOT NULL)");
+ stmt.execute("CREATE TABLE NEXT_WRITE_ID (" +
+ " NWI_DATABASE varchar(128) NOT NULL," +
+ " NWI_TABLE varchar(256) NOT NULL," +
+ " NWI_NEXT bigint NOT NULL)");
+
stmt.execute("CREATE TABLE HIVE_LOCKS (" +
" HL_LOCK_EXT_ID bigint NOT NULL," +
" HL_LOCK_INT_ID bigint NOT NULL," +
@@ -130,7 +143,7 @@ public final class TxnDbUtil {
" CQ_WORKER_ID varchar(128)," +
" CQ_START bigint," +
" CQ_RUN_AS varchar(128)," +
- " CQ_HIGHEST_TXN_ID bigint," +
+ " CQ_HIGHEST_WRITE_ID bigint," +
" CQ_META_INFO varchar(2048) for bit data," +
" CQ_HADOOP_JOB_ID varchar(32))");
@@ -138,20 +151,20 @@ public final class TxnDbUtil {
stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
stmt.execute("CREATE TABLE COMPLETED_COMPACTIONS (" +
- " CC_ID bigint PRIMARY KEY," +
- " CC_DATABASE varchar(128) NOT NULL," +
- " CC_TABLE varchar(128) NOT NULL," +
- " CC_PARTITION varchar(767)," +
- " CC_STATE char(1) NOT NULL," +
- " CC_TYPE char(1) NOT NULL," +
- " CC_TBLPROPERTIES varchar(2048)," +
- " CC_WORKER_ID varchar(128)," +
- " CC_START bigint," +
- " CC_END bigint," +
- " CC_RUN_AS varchar(128)," +
- " CC_HIGHEST_TXN_ID bigint," +
- " CC_META_INFO varchar(2048) for bit data," +
- " CC_HADOOP_JOB_ID varchar(32))");
+ " CC_ID bigint PRIMARY KEY," +
+ " CC_DATABASE varchar(128) NOT NULL," +
+ " CC_TABLE varchar(128) NOT NULL," +
+ " CC_PARTITION varchar(767)," +
+ " CC_STATE char(1) NOT NULL," +
+ " CC_TYPE char(1) NOT NULL," +
+ " CC_TBLPROPERTIES varchar(2048)," +
+ " CC_WORKER_ID varchar(128)," +
+ " CC_START bigint," +
+ " CC_END bigint," +
+ " CC_RUN_AS varchar(128)," +
+ " CC_HIGHEST_WRITE_ID bigint," +
+ " CC_META_INFO varchar(2048) for bit data," +
+ " CC_HADOOP_JOB_ID varchar(32))");
stmt.execute("CREATE TABLE AUX_TABLE (" +
" MT_KEY1 varchar(128) NOT NULL," +
@@ -219,6 +232,8 @@ public final class TxnDbUtil {
success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
success &= dropTable(stmt, "TXNS", retryCount);
success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+ success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount);
+ success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount);
success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 1bb976c..ac61715 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -61,6 +61,7 @@ import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
@@ -69,6 +70,8 @@ import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
@@ -80,6 +83,8 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
@@ -102,10 +107,12 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -819,8 +826,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Move the record from txn_components into completed_txn_components so that the compactor
// knows where to look to compact.
String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
- "ctc_table, ctc_partition) select tc_txnid, tc_database, tc_table, " +
- "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
+ "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
+ "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
int modCount = 0;
if ((modCount = stmt.executeUpdate(s)) < 1) {
@@ -869,6 +876,244 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
@Override
+ @RetrySemantics.ReadOnly
+ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+ throws NoSuchTxnException, MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ValidTxnList validTxnList;
+
+ // We should prepare the valid write ids list based on validTxnList of current txn.
+ // If no txn exists in the caller, then they would pass null for validTxnList and so it is
+ // required to get the current state of txns to make validTxnList
+ if (rqst.isSetValidTxnList()) {
+ validTxnList = new ValidReadTxnList(rqst.getValidTxnList());
+ } else {
+ // Passing 0 for currentTxn means, this validTxnList is not wrt to any txn
+ validTxnList = TxnUtils.createValidReadTxnList(getOpenTxns(), 0);
+ }
+ try {
+ /**
+ * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
+ */
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ // Get the valid write id list for all the tables read by the current txn
+ List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
+ for (String fullTableName : rqst.getFullTableNames()) {
+ tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList));
+ }
+
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ GetValidWriteIdsResponse owr = new GetValidWriteIdsResponse(tblValidWriteIdsList);
+ return owr;
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getValidWriteIds");
+ throw new MetaException("Unable to select from transaction database, "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(null, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ return getValidWriteIds(rqst);
+ }
+ }
+
+ // Method to get the Valid write ids list for the given table
+ // Input fullTableName is expected to be of format <db_name>.<table_name>
+ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName,
+ ValidTxnList validTxnList) throws SQLException {
+ ResultSet rs = null;
+ String[] names = TxnUtils.getDbTableName(fullTableName);
+ try {
+ // Need to initialize to 0 to make sure if nobody modified this table, then current txn
+ // shouldn't read any data
+ long writeIdHwm = 0;
+ List<Long> invalidWriteIdList = new ArrayList<>();
+ long txnHwm = validTxnList.getHighWatermark();
+
+ // The output includes all the txns which are under the high water mark. It includes
+ // the committed transactions as well. The results should be sorted in ascending order based
+ // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up
+ // using binary search.
+ String s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm
+ + " and t2w_database = " + quoteString(names[0])
+ + " and t2w_table = " + quoteString(names[1])
+ + " order by t2w_writeid asc";
+ LOG.debug("Going to execute query<" + s + ">");
+ rs = stmt.executeQuery(s);
+ long minOpenWriteId = Long.MAX_VALUE;
+ BitSet abortedBits = new BitSet();
+ while (rs.next()) {
+ long txnId = rs.getLong(1);
+ long writeId = rs.getLong(2);
+ writeIdHwm = Math.max(writeIdHwm, writeId);
+ if (validTxnList.isTxnValid(txnId)) {
+ // Skip if the transaction under evaluation is already committed.
+ continue;
+ }
+
+ // The current txn is either in open or aborted state.
+ // Mark the write ids state as per the txn state.
+ if (validTxnList.isTxnAborted(txnId)) {
+ invalidWriteIdList.add(writeId);
+ abortedBits.set(invalidWriteIdList.size() - 1);
+ } else {
+ invalidWriteIdList.add(writeId);
+ minOpenWriteId = Math.min(minOpenWriteId, writeId);
+ }
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+ TableValidWriteIds owi = new TableValidWriteIds(fullTableName, writeIdHwm, invalidWriteIdList, byteBuffer);
+ if (minOpenWriteId < Long.MAX_VALUE) {
+ owi.setMinOpenWriteId(minOpenWriteId);
+ }
+ return owi;
+ } finally {
+ close(rs);
+ }
+ }
+
+ @Override
+ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
+ List<Long> txnIds = rqst.getTxnIds();
+ String dbName = rqst.getDbName().toLowerCase();
+ String tblName = rqst.getTableName().toLowerCase();
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ Collections.sort(txnIds); //easier to read logs
+
+ // Check if all the input txns are in open state. Write ID should be allocated only for open transactions.
+ if (!isTxnsInOpenState(txnIds, stmt)) {
+ ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
+ throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+ }
+
+ List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
+ List<Long> allocatedTxns = new ArrayList<>();
+ long txnId;
+ long writeId;
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
+ // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
+ // The write id would have been already allocated in case of multi-statement txns where
+ // first write on a table will allocate write id and rest of the writes should re-use it.
+ prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
+ + " t2w_database = " + quoteString(dbName)
+ + " and t2w_table = " + quoteString(tblName) + " and ");
+ suffix.append("");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "t2w_txnid", false, false);
+ for (String query : queries) {
+ LOG.debug("Going to execute query <" + query + ">");
+ rs = stmt.executeQuery(query);
+ while (rs.next()) {
+ // If table write ID is already allocated for the given transaction, then just use it
+ txnId = rs.getLong(1);
+ writeId = rs.getLong(2);
+ txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+ allocatedTxns.add(txnId);
+ LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
+ }
+ }
+
+ // If all the txns in the list have already allocated write ids, then just skip new allocations
+ long numOfWriteIds = txnIds.size() - allocatedTxns.size();
+ assert(numOfWriteIds >= 0);
+ if (0 == numOfWriteIds) {
+ // If all the txns in the list have pre-allocated write ids for the given table, then just return
+ return new AllocateTableWriteIdsResponse(txnToWriteIds);
+ }
+
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+
+ // There are some txns in the list which has no write id allocated and hence go ahead and do it.
+ // Get the next write id for the given table and update it with new next write id.
+ // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
+ String s = sqlGenerator.addForUpdateClause(
+ "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+ + " and nwi_table = " + quoteString(tblName));
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ // First allocation of write id should add the table to the next_write_id meta table
+ // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
+ s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+ + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")";
+ LOG.debug("Going to execute insert <" + s + ">");
+ stmt.execute(s);
+ writeId = 1;
+ } else {
+ // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
+ writeId = rs.getLong(1);
+ s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
+ + " where nwi_database = " + quoteString(dbName)
+ + " and nwi_table = " + quoteString(tblName);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ }
+
+ // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
+ // write ids
+ List<String> rows = new ArrayList<>();
+ for (long txn : txnIds) {
+ if (allocatedTxns.contains(txn)) {
+ continue;
+ }
+ rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+ txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+ LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+ writeId++;
+ }
+
+ // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
+ List<String> inserts = sqlGenerator.createInsertValuesStmt(
+ "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
+ for (String insert : inserts) {
+ LOG.debug("Going to execute insert <" + insert + ">");
+ stmt.execute(insert);
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return new AllocateTableWriteIdsResponse(txnToWriteIds);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return allocateTableWriteIds(rqst);
+ }
+ }
+
+ @Override
@RetrySemantics.SafeToRetry
public void performWriteSetGC() {
Connection dbConn = null;
@@ -1122,13 +1367,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String dbName = lc.getDbname();
String tblName = lc.getTablename();
String partName = lc.getPartitionname();
+ Long writeId = null;
+ if (tblName != null) {
+ // It is assumed the caller have already allocated write id for adding/updating data to
+ // the acid tables. However, DDL operatons won't allocate write id and hence this query
+ // may return empty result sets.
+ // Get the write id allocated by this txn for the given table writes
+ s = "select t2w_writeid from TXN_TO_WRITE_ID where"
+ + " t2w_database = " + quoteString(dbName.toLowerCase())
+ + " and t2w_table = " + quoteString(tblName.toLowerCase())
+ + " and t2w_txnid = " + txnid;
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (rs.next()) {
+ writeId = rs.getLong(1);
+ }
+ }
rows.add(txnid + ", '" + dbName + "', " +
- (tblName == null ? "null" : "'" + tblName + "'") + ", " +
- (partName == null ? "null" : "'" + partName + "'")+ "," +
- quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString()));
+ (tblName == null ? "null" : "'" + tblName + "'") + ", " +
+ (partName == null ? "null" : "'" + partName + "'")+ "," +
+ quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
+ (writeId == null ? "null" : writeId));
}
List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
+ "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
for(String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
int modCount = stmt.executeUpdate(query);
@@ -1810,7 +2072,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
Statement stmt = null;
ResultSet lockHandle = null;
- ResultSet rs = null;
try {
try {
lockInternal();
@@ -1827,15 +2088,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if(rqst.isSetOperationType()) {
ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
}
+
+ Long writeId = rqst.getWriteid();
List<String> rows = new ArrayList<>();
for (String partName : rqst.getPartitionnames()) {
rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
- "," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
+ "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
}
int modCount = 0;
//record partitions that were written to
List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
+ "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
for(String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
modCount = stmt.executeUpdate(query);
@@ -1880,7 +2143,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
StringBuilder buff = new StringBuilder();
switch (type) {
- case DATABASE:
+ case DATABASE: {
dbName = db.getName();
buff.append("delete from TXN_COMPONENTS where tc_database='");
@@ -1906,8 +2169,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
buff.append("'");
queries.add(buff.toString());
+ buff.setLength(0);
+ buff.append("delete from TXN_TO_WRITE_ID where t2w_database='");
+ buff.append(dbName.toLowerCase());
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from NEXT_WRITE_ID where nwi_database='");
+ buff.append(dbName.toLowerCase());
+ buff.append("'");
+ queries.add(buff.toString());
+
break;
- case TABLE:
+ }
+ case TABLE: {
dbName = table.getDbName();
tblName = table.getTableName();
@@ -1942,8 +2218,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
buff.append("'");
queries.add(buff.toString());
+ buff.setLength(0);
+ buff.append("delete from TXN_TO_WRITE_ID where t2w_database='");
+ buff.append(dbName.toLowerCase());
+ buff.append("' and t2w_table='");
+ buff.append(tblName.toLowerCase());
+ buff.append("'");
+ queries.add(buff.toString());
+
+ buff.setLength(0);
+ buff.append("delete from NEXT_WRITE_ID where nwi_database='");
+ buff.append(dbName.toLowerCase());
+ buff.append("' and nwi_table='");
+ buff.append(tblName.toLowerCase());
+ buff.append("'");
+ queries.add(buff.toString());
+
break;
- case PARTITION:
+ }
+ case PARTITION: {
dbName = table.getDbName();
tblName = table.getTableName();
List<FieldSchema> partCols = table.getPartitionKeys(); // partition columns
@@ -1996,8 +2289,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
break;
- default:
+ }
+ default: {
throw new MetaException("Invalid object type for cleanup: " + type);
+ }
}
for (String query : queries) {
@@ -3003,6 +3298,115 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
/**
+ * Checks if all the txns in the list are in open state.
+ * @param txnIds list of txns to be evaluated for open state
+ * @param stmt db statement
+ * @return If all txns in open state, then return true else false
+ */
+ private boolean isTxnsInOpenState(List<Long> txnIds, Statement stmt) throws SQLException {
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ // Get the count of txns from the given list are in open state. If the returned count is same as
+ // the input number of txns, then it means, all are in open state.
+ prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and ");
+ suffix.append("");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "txn_id", false, false);
+
+ long count = 0;
+ for (String query : queries) {
+ LOG.debug("Going to execute query <" + query + ">");
+ ResultSet rs = stmt.executeQuery(query);
+ if (rs.next()) {
+ count += rs.getLong(1);
+ }
+ }
+ return count == txnIds.size();
+ }
+
+ /**
+ * Checks if all the txns in the list are in open state.
+ * @param dbName Database name
+ * @param tblName Table on which we try to allocate write id
+ * @param txnIds list of txns to be evaluated for open state
+ * @param stmt db statement
+ */
+ private void ensureAllTxnsValid(String dbName, String tblName, List<Long> txnIds, Statement stmt)
+ throws SQLException {
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ // Check if any of the txns in the list is aborted.
+ prefix.append("select txn_id, txn_state from TXNS where ");
+ suffix.append("");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "txn_id", false, false);
+ Long txnId;
+ char txnState;
+ boolean isAborted = false;
+ StringBuilder errorMsg = new StringBuilder();
+ errorMsg.append("Write ID allocation on ")
+ .append(Warehouse.getQualifiedName(dbName, tblName))
+ .append(" failed for input txns: ");
+ for (String query : queries) {
+ LOG.debug("Going to execute query <" + query + ">");
+ ResultSet rs = stmt.executeQuery(query);
+ while (rs.next()) {
+ txnId = rs.getLong(1);
+ txnState = rs.getString(2).charAt(0);
+ if (txnState != TXN_OPEN) {
+ isAborted = true;
+ errorMsg.append("{").append(txnId).append(",").append(txnState).append("}");
+ }
+ }
+ }
+ // Check if any of the txns in the list is committed.
+ boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg);
+ if (isAborted || isCommitted) {
+ LOG.error(errorMsg.toString());
+ throw new IllegalStateException("Write ID allocation failed on "
+ + Warehouse.getQualifiedName(dbName, tblName)
+ + " as not all input txns in open state");
+ }
+ }
+
+ /**
+ * Checks if all the txns in the list are in committed. If yes, throw eception.
+ * @param txnIds list of txns to be evaluated for committed
+ * @param stmt db statement
+ * @return true if any input txn is committed, else false
+ */
+ private boolean checkIfTxnsCommitted(List<Long> txnIds, Statement stmt, StringBuilder errorMsg)
+ throws SQLException {
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+
+ // Check if any of the txns in the list is committed. If yes, throw exception.
+ prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where ");
+ suffix.append("");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "ctc_txnid", false, false);
+ Long txnId;
+ boolean isCommitted = false;
+ for (String query : queries) {
+ LOG.debug("Going to execute query <" + query + ">");
+ ResultSet rs = stmt.executeQuery(query);
+ while (rs.next()) {
+ isCommitted = true;
+ txnId = rs.getLong(1);
+ if (errorMsg != null) {
+ errorMsg.append("{").append(txnId).append(",c}");
+ }
+ }
+ }
+ return isCommitted;
+ }
+
+ /**
* Used to raise an informative error when the caller expected a txn in a particular TxnStatus
* but found it in some other status
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 3e27034..38fa0e2 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -38,8 +38,10 @@ import java.util.Set;
@InterfaceStability.Evolving
public interface TxnStore extends Configurable {
- enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
- WriteSetCleaner, CompactionScheduler}
+ enum MUTEX_KEY {
+ Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
+ WriteSetCleaner, CompactionScheduler, WriteIdAllocator
+ }
// Compactor states (Should really be enum)
String INITIATED_RESPONSE = "initiated";
String WORKING_RESPONSE = "working";
@@ -123,6 +125,25 @@ public interface TxnStore extends Configurable {
public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
String inputDbName, String inputTableName, ValidTxnList txnList)
throws MetaException;
+ /**
+ * Gets the list of valid write ids for the given table wrt to current txn
+ * @param rqst info on transaction and list of table names associated with given transaction
+ * @throws NoSuchTxnException
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
+ throws NoSuchTxnException, MetaException;
+
+ /**
+ * Allocate a write ID for the given table and associate it with a transaction
+ * @param rqst info on transaction and table to allocate write id
+ * @throws NoSuchTxnException
+ * @throws TxnAbortedException
+ * @throws MetaException
+ */
+ AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIdsRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException;
/**
* Obtain a lock.
@@ -206,7 +227,7 @@ public interface TxnStore extends Configurable {
CompactionResponse compact(CompactionRequest rqst) throws MetaException;
/**
- * Show list of current compactions
+ * Show list of current compactions.
* @param rqst info on which compactions to show
* @return compaction information
* @throws MetaException
@@ -226,7 +247,7 @@ public interface TxnStore extends Configurable {
throws NoSuchTxnException, TxnAbortedException, MetaException;
/**
- * Clean up corresponding records in metastore tables
+ * Clean up corresponding records in metastore tables.
* @param type Hive object type
* @param db database object
* @param table table object
@@ -350,10 +371,10 @@ public interface TxnStore extends Configurable {
List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException;
/**
- * Record the highest txn id that the {@code ci} compaction job will pay attention to.
+ * Record the highest write id that the {@code ci} compaction job will pay attention to.
*/
@RetrySemantics.Idempotent
- void setCompactionHighestTxnId(CompactionInfo ci, long highestTxnId) throws MetaException;
+ void setCompactionHighestWriteId(CompactionInfo ci, long highestWriteId) throws MetaException;
/**
* For any given compactable entity (partition, table if not partitioned) the history of compactions
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 027fb3f..7b02865 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -18,15 +18,16 @@
package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnInfo;
-import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -63,53 +64,94 @@ public class TxnUtils {
BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
int i = 0;
- for(long txn: open) {
+ for (long txn : open) {
if (currentTxn > 0 && currentTxn == txn) continue;
exceptions[i++] = txn;
}
- if(txns.isSetMin_open_txn()) {
+ if (txns.isSetMin_open_txn()) {
return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
- }
- else {
+ } else {
return new ValidReadTxnList(exceptions, abortedBits, highWater);
}
}
/**
- * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
- * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to
- * compact the files, and thus treats only open transactions as invalid. Additionally any
- * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like
- * delta_17_120 where txnId 80, for example, is still open.
- * @param txns txn list from the metastore
- * @return a valid txn list.
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse} to a
+ * {@link org.apache.hadoop.hive.common.ValidTxnWriteIdList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted transactions as invalid.
+ * @param currentTxnId current txn ID for which we get the valid write ids list
+ * @param validWriteIds valid write ids list from the metastore
+ * @return a valid write IDs list for the whole transaction.
*/
- public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
- //highWater is the last txn id that has been allocated
- long highWater = txns.getTxn_high_water_mark();
- long minOpenTxn = Long.MAX_VALUE;
- long[] exceptions = new long[txns.getOpen_txnsSize()];
+ public static ValidTxnWriteIdList createValidTxnWriteIdList(Long currentTxnId,
+ GetValidWriteIdsResponse validWriteIds) {
+ ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(currentTxnId);
+ for (TableValidWriteIds tableWriteIds : validWriteIds.getTblValidWriteIds()) {
+ validTxnWriteIdList.addTableValidWriteIdList(createValidReaderWriteIdList(tableWriteIds));
+ }
+ return validTxnWriteIdList;
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+ * {@link org.apache.hadoop.hive.common.ValidReaderWriteIdList}. This assumes that the caller intends to
+ * read the files, and thus treats both open and aborted write ids as invalid.
+ * @param tableWriteIds valid write ids for the given table from the metastore
+ * @return a valid write IDs list for the input table
+ */
+ public static ValidReaderWriteIdList createValidReaderWriteIdList(TableValidWriteIds tableWriteIds) {
+ String fullTableName = tableWriteIds.getFullTableName();
+ long highWater = tableWriteIds.getWriteIdHighWaterMark();
+ List<Long> invalids = tableWriteIds.getInvalidWriteIds();
+ BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
+ long[] exceptions = new long[invalids.size()];
int i = 0;
- for (TxnInfo txn : txns.getOpen_txns()) {
- if (txn.getState() == TxnState.OPEN) {
- minOpenTxn = Math.min(minOpenTxn, txn.getId());
- }
- else {
- //only need aborted since we don't consider anything above minOpenTxn
- exceptions[i++] = txn.getId();
+ for (long writeId : invalids) {
+ exceptions[i++] = writeId;
+ }
+ if (tableWriteIds.isSetMinOpenWriteId()) {
+ return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater,
+ tableWriteIds.getMinOpenWriteId());
+ } else {
+ return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits, highWater);
+ }
+ }
+
+ /**
+ * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
+ * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to
+ * compact the files, and thus treats only open transactions/write ids as invalid. Additionally any
+ * writeId > highestOpenWriteId is also invalid. This is to avoid creating something like
+ * delta_17_120 where writeId 80, for example, is still open.
+ * @param tableValidWriteIds table write id list from the metastore
+ * @return a valid write id list.
+ */
+ public static ValidCompactorWriteIdList createValidCompactWriteIdList(TableValidWriteIds tableValidWriteIds) {
+ String fullTableName = tableValidWriteIds.getFullTableName();
+ long highWater = tableValidWriteIds.getWriteIdHighWaterMark();
+ long minOpenWriteId = Long.MAX_VALUE;
+ List<Long> invalids = tableValidWriteIds.getInvalidWriteIds();
+ BitSet abortedBits = BitSet.valueOf(tableValidWriteIds.getAbortedBits());
+ long[] exceptions = new long[invalids.size()];
+ int i = 0;
+ for (long writeId : invalids) {
+ if (abortedBits.get(i)) {
+ // Only need aborted since we don't consider anything above minOpenWriteId
+ exceptions[i++] = writeId;
+ } else {
+ minOpenWriteId = Math.min(minOpenWriteId, writeId);
}
}
if(i < exceptions.length) {
exceptions = Arrays.copyOf(exceptions, i);
}
- highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
+ highWater = minOpenWriteId == Long.MAX_VALUE ? highWater : minOpenWriteId - 1;
BitSet bitSet = new BitSet(exceptions.length);
- bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted
- if(minOpenTxn == Long.MAX_VALUE) {
- return new ValidCompactorTxnList(exceptions, bitSet, highWater);
- }
- else {
- return new ValidCompactorTxnList(exceptions, bitSet, highWater, minOpenTxn);
+ bitSet.set(0, exceptions.length); // for ValidCompactorWriteIdList, everything in exceptions are aborted
+ if (minOpenWriteId == Long.MAX_VALUE) {
+ return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater);
+ } else {
+ return new ValidCompactorWriteIdList(fullTableName, exceptions, bitSet, highWater, minOpenWriteId);
}
}
@@ -134,7 +176,7 @@ public class TxnUtils {
* Note, users are responsible for using the correct TxnManager. We do not look at
* SessionState.get().getTxnMgr().supportsAcid() here
* Should produce the same result as
- * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}
+ * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isTransactionalTable(org.apache.hadoop.hive.ql.metadata.Table)}.
* @return true if table is a transactional table, false otherwise
*/
public static boolean isTransactionalTable(Table table) {
@@ -148,7 +190,7 @@ public class TxnUtils {
/**
* Should produce the same result as
- * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}
+ * {@link org.apache.hadoop.hive.ql.io.AcidUtils#isAcidTable(org.apache.hadoop.hive.ql.metadata.Table)}.
*/
public static boolean isAcidTable(Table table) {
return TxnUtils.isTransactionalTable(table) &&
@@ -157,6 +199,19 @@ public class TxnUtils {
}
/**
+ * Should produce the result as <dbName>.<tableName>.
+ */
+ public static String getFullTableName(String dbName, String tableName) {
+ return dbName.toLowerCase() + "." + tableName.toLowerCase();
+ }
+
+ public static String[] getDbTableName(String fullTableName) {
+ return fullTableName.split("\\.");
+ }
+
+
+
+ /**
* Build a query (or queries if one query is too big but only for the case of 'IN'
* composite clause. For the case of 'NOT IN' clauses, multiple queries change
* the semantics of the intended query.
@@ -357,7 +412,7 @@ public class TxnUtils {
return ret;
}
- /*
+ /**
* Compute and return the size of a query statement with the given parameters as input variables.
*
* @param sizeSoFar size of the current contents of the buf
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
index ac28869..9d8a703 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql
@@ -422,7 +422,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
TC_PARTITION varchar(767),
- TC_OPERATION_TYPE char(1) NOT NULL
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint
);
CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
@@ -432,7 +433,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CTC_DATABASE varchar(128) NOT NULL,
CTC_TABLE varchar(256),
CTC_PARTITION varchar(767),
- CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+ CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID bigint
);
CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -480,7 +482,7 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_WORKER_ID varchar(128),
CQ_START bigint,
CQ_RUN_AS varchar(128),
- CQ_HIGHEST_TXN_ID bigint,
+ CQ_HIGHEST_WRITE_ID bigint,
CQ_META_INFO varchar(2048) for bit data,
CQ_HADOOP_JOB_ID varchar(32)
);
@@ -502,7 +504,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CC_START bigint,
CC_END bigint,
CC_RUN_AS varchar(128),
- CC_HIGHEST_TXN_ID bigint,
+ CC_HIGHEST_WRITE_ID bigint,
CC_META_INFO varchar(2048) for bit data,
CC_HADOOP_JOB_ID varchar(32)
);
@@ -525,6 +527,23 @@ CREATE TABLE WRITE_SET (
WS_OPERATION_TYPE char(1) NOT NULL
);
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
index d49255a..a50c45d 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
@@ -94,3 +94,29 @@ UPDATE SDS
UPDATE DBS
SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4)
WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- 050-HIVE-18192.derby.sql
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+RENAME COLUMN COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+RENAME COLUMN COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
index 7c26d5d..1b7d0da 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql
@@ -969,7 +969,7 @@ CREATE TABLE COMPACTION_QUEUE(
CQ_WORKER_ID nvarchar(128) NULL,
CQ_START bigint NULL,
CQ_RUN_AS nvarchar(128) NULL,
- CQ_HIGHEST_TXN_ID bigint NULL,
+ CQ_HIGHEST_WRITE_ID bigint NULL,
CQ_META_INFO varbinary(2048) NULL,
CQ_HADOOP_JOB_ID nvarchar(128) NULL,
PRIMARY KEY CLUSTERED
@@ -990,7 +990,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CC_START bigint NULL,
CC_END bigint NULL,
CC_RUN_AS nvarchar(128) NULL,
- CC_HIGHEST_TXN_ID bigint NULL,
+ CC_HIGHEST_WRITE_ID bigint NULL,
CC_META_INFO varbinary(2048) NULL,
CC_HADOOP_JOB_ID nvarchar(128) NULL,
PRIMARY KEY CLUSTERED
@@ -1004,7 +1004,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS(
CTC_DATABASE nvarchar(128) NOT NULL,
CTC_TABLE nvarchar(128) NULL,
CTC_PARTITION nvarchar(767) NULL,
- CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL
+ CTC_TIMESTAMP datetime2 DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID bigint
);
CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -1072,7 +1073,8 @@ CREATE TABLE TXN_COMPONENTS(
TC_DATABASE nvarchar(128) NOT NULL,
TC_TABLE nvarchar(128) NULL,
TC_PARTITION nvarchar(767) NULL,
- TC_OPERATION_TYPE char(1) NOT NULL
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint
);
ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -1129,6 +1131,23 @@ CREATE TABLE METASTORE_DB_PROPERTIES (
ALTER TABLE METASTORE_DB_PROPERTIES ADD CONSTRAINT PROPERTY_KEY_PK PRIMARY KEY (PROPERTY_KEY);
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE nvarchar(128) NOT NULL,
+ T2W_TABLE nvarchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE nvarchar(128) NOT NULL,
+ NWI_TABLE nvarchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
index 6dc3e1a..8ab466d 100644
--- a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql
@@ -148,3 +148,29 @@ UPDATE SDS
UPDATE DBS
SET DB_LOCATION_URI = 's3a' + SUBSTRING(DB_LOCATION_URI, 4, LEN(DB_LOCATION_URI))
WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE nvarchar(128) NOT NULL,
+ T2W_TABLE nvarchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE nvarchar(128) NOT NULL,
+ NWI_TABLE nvarchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+EXEC SP_RENAME 'COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID', 'CQ_HIGHEST_WRITE_ID', 'COLUMN';
+
+EXEC SP_RENAME 'COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID', 'CC_HIGHEST_WRITE_ID', 'COLUMN';
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
index 0eb2e2e..886c932 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
@@ -962,6 +962,7 @@ CREATE TABLE TXN_COMPONENTS (
TC_TABLE varchar(128) NOT NULL,
TC_PARTITION varchar(767),
TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint,
FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -972,7 +973,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CTC_DATABASE varchar(128) NOT NULL,
CTC_TABLE varchar(256),
CTC_PARTITION varchar(767),
- CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+ CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID bigint
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX2 ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE;
@@ -1021,7 +1023,7 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_WORKER_ID varchar(128),
CQ_START bigint,
CQ_RUN_AS varchar(128),
- CQ_HIGHEST_TXN_ID bigint,
+ CQ_HIGHEST_WRITE_ID bigint,
CQ_META_INFO varbinary(2048),
CQ_HADOOP_JOB_ID varchar(32)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -1038,7 +1040,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CC_START bigint,
CC_END bigint,
CC_RUN_AS varchar(128),
- CC_HIGHEST_TXN_ID bigint,
+ CC_HIGHEST_WRITE_ID bigint,
CC_META_INFO varbinary(2048),
CC_HADOOP_JOB_ID varchar(32)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -1063,6 +1065,24 @@ CREATE TABLE WRITE_SET (
WS_COMMIT_ID bigint NOT NULL,
WS_OPERATION_TYPE char(1) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
index 0a170f6..a537734 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
@@ -133,3 +133,29 @@ UPDATE SDS
UPDATE DBS
SET DB_LOCATION_URI = CONCAT('s3a', SUBSTR(DB_LOCATION_URI, 4, LENGTH(DB_LOCATION_URI)))
WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE CHANGE `CQ_HIGHEST_TXN_ID` `CQ_HIGHEST_WRITE_ID` bigint;
+
+ALTER TABLE COMPLETED_COMPACTIONS CHANGE `CC_HIGHEST_TXN_ID` `CC_HIGHEST_WRITE_ID` bigint;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
index 37f9063..366b2d9 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
@@ -936,7 +936,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_DATABASE VARCHAR2(128) NOT NULL,
TC_TABLE VARCHAR2(256),
TC_PARTITION VARCHAR2(767) NULL,
- TC_OPERATION_TYPE char(1) NOT NULL
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID NUMBER(19)
) ROWDEPENDENCIES;
CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
@@ -946,7 +947,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CTC_DATABASE VARCHAR2(128) NOT NULL,
CTC_TABLE VARCHAR2(128),
CTC_PARTITION VARCHAR2(767),
- CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+ CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID NUMBER(19)
) ROWDEPENDENCIES;
CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -994,7 +996,7 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_WORKER_ID varchar(128),
CQ_START NUMBER(19),
CQ_RUN_AS varchar(128),
- CQ_HIGHEST_TXN_ID NUMBER(19),
+ CQ_HIGHEST_WRITE_ID NUMBER(19),
CQ_META_INFO BLOB,
CQ_HADOOP_JOB_ID varchar2(32)
) ROWDEPENDENCIES;
@@ -1016,7 +1018,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CC_START NUMBER(19),
CC_END NUMBER(19),
CC_RUN_AS varchar(128),
- CC_HIGHEST_TXN_ID NUMBER(19),
+ CC_HIGHEST_WRITE_ID NUMBER(19),
CC_META_INFO BLOB,
CC_HADOOP_JOB_ID varchar2(32)
) ROWDEPENDENCIES;
@@ -1037,6 +1039,23 @@ CREATE TABLE WRITE_SET (
WS_OPERATION_TYPE char(1) NOT NULL
);
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID number(19) NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
index a923d92..bd786fb 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
@@ -156,3 +156,29 @@ UPDATE SDS
UPDATE DBS
SET DB_LOCATION_URI = 's3a' || SUBSTR(DB_LOCATION_URI, 4)
WHERE DB_LOCATION_URI LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID number(19) NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT number(19) NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE RENAME COLUMN CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+ALTER TABLE COMPLETED_COMPACTIONS RENAME COLUMN CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID number(19);
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID number(19);
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
index 9d63056..4abf24c 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
@@ -1628,7 +1628,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
TC_PARTITION varchar(767) DEFAULT NULL,
- TC_OPERATION_TYPE char(1) NOT NULL
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint
);
CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS USING hash (TC_TXNID);
@@ -1638,7 +1639,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CTC_DATABASE varchar(128) NOT NULL,
CTC_TABLE varchar(256),
CTC_PARTITION varchar(767),
- CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL
+ CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
+ CTC_WRITEID bigint
);
CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS USING btree (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
@@ -1686,7 +1688,7 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_WORKER_ID varchar(128),
CQ_START bigint,
CQ_RUN_AS varchar(128),
- CQ_HIGHEST_TXN_ID bigint,
+ CQ_HIGHEST_WRITE_ID bigint,
CQ_META_INFO bytea,
CQ_HADOOP_JOB_ID varchar(32)
);
@@ -1708,7 +1710,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
CC_START bigint,
CC_END bigint,
CC_RUN_AS varchar(128),
- CC_HIGHEST_TXN_ID bigint,
+ CC_HIGHEST_WRITE_ID bigint,
CC_META_INFO bytea,
CC_HADOOP_JOB_ID varchar(32)
);
@@ -1729,6 +1731,23 @@ CREATE TABLE WRITE_SET (
WS_OPERATION_TYPE char(1) NOT NULL
);
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
index eb45cd2..34ed974 100644
--- a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql
@@ -172,3 +172,29 @@ UPDATE "SDS"
UPDATE "DBS"
SET "DB_LOCATION_URI" = 's3a' || SUBSTR("DB_LOCATION_URI", 4)
WHERE "DB_LOCATION_URI" LIKE 's3n://%' ;
+
+-- HIVE-18192
+CREATE TABLE TXN_TO_WRITE_ID (
+ T2W_TXNID bigint NOT NULL,
+ T2W_DATABASE varchar(128) NOT NULL,
+ T2W_TABLE varchar(256) NOT NULL,
+ T2W_WRITEID bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID);
+
+CREATE TABLE NEXT_WRITE_ID (
+ NWI_DATABASE varchar(128) NOT NULL,
+ NWI_TABLE varchar(256) NOT NULL,
+ NWI_NEXT bigint NOT NULL
+);
+
+CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
+
+ALTER TABLE COMPACTION_QUEUE RENAME CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID;
+
+ALTER TABLE COMPLETED_COMPACTIONS RENAME CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID;
+
+-- Modify txn_components/completed_txn_components tables to add write id.
+ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint;
+ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint;
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/standalone-metastore/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index 35fc8b3..b11ee38 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -731,6 +731,43 @@ struct CommitTxnRequest {
1: required i64 txnid,
}
+// Request msg to get the valid write ids list for the given list of tables wrt to input validTxnList
+struct GetValidWriteIdsRequest {
+ 1: required list<string> fullTableNames, // Full table names of format <db_name>.<table_name>
+ 2: required string validTxnList, // Valid txn list string wrt the current txn of the caller
+}
+
+// Valid Write ID list of one table wrt to current txn
+struct TableValidWriteIds {
+ 1: required string fullTableName, // Full table name of format <db_name>.<table_name>
+ 2: required i64 writeIdHighWaterMark, // The highest write id valid for this table wrt given txn
+ 3: required list<i64> invalidWriteIds, // List of open and aborted writes ids in the table
+ 4: optional i64 minOpenWriteId, // Minimum write id which maps to a opened txn
+ 5: required binary abortedBits, // Bit array to identify the aborted write ids in invalidWriteIds list
+}
+
+// Valid Write ID list for all the input tables wrt to current txn
+struct GetValidWriteIdsResponse {
+ 1: required list<TableValidWriteIds> tblValidWriteIds,
+}
+
+// Request msg to allocate table write ids for the given list of txns
+struct AllocateTableWriteIdsRequest {
+ 1: required list<i64> txnIds,
+ 2: required string dbName,
+ 3: required string tableName,
+}
+
+// Map for allocated write id against the txn for which it is allocated
+struct TxnToWriteId {
+ 1: required i64 txnId,
+ 2: required i64 writeId,
+}
+
+struct AllocateTableWriteIdsResponse {
+ 1: required list<TxnToWriteId> txnToWriteIds,
+}
+
struct LockComponent {
1: required LockType type,
2: required LockLevel level,
@@ -850,10 +887,11 @@ struct ShowCompactResponse {
struct AddDynamicPartitions {
1: required i64 txnid,
- 2: required string dbname,
- 3: required string tablename,
- 4: required list<string> partitionnames,
- 5: optional DataOperationType operationType = DataOperationType.UNSET
+ 2: required i64 writeid,
+ 3: required string dbname,
+ 4: required string tablename,
+ 5: required list<string> partitionnames,
+ 6: optional DataOperationType operationType = DataOperationType.UNSET
}
struct BasicTxnInfo {
@@ -1807,6 +1845,10 @@ service ThriftHiveMetastore extends fb303.FacebookService
void abort_txn(1:AbortTxnRequest rqst) throws (1:NoSuchTxnException o1)
void abort_txns(1:AbortTxnsRequest rqst) throws (1:NoSuchTxnException o1)
void commit_txn(1:CommitTxnRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2)
+ GetValidWriteIdsResponse get_valid_write_ids(1:GetValidWriteIdsRequest rqst)
+ throws (1:NoSuchTxnException o1, 2:MetaException o2)
+ AllocateTableWriteIdsResponse allocate_table_write_ids(1:AllocateTableWriteIdsRequest rqst)
+ throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:MetaException o3)
LockResponse lock(1:LockRequest rqst) throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2)
LockResponse check_lock(1:CheckLockRequest rqst)
throws (1:NoSuchTxnException o1, 2:TxnAbortedException o2, 3:NoSuchLockException o3)
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
deleted file mode 100644
index 94b8c58..0000000
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.common;
-
-import java.util.Arrays;
-import java.util.BitSet;
-
-/**
- * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor.
- *
- * Compaction should only include txns up to smallest open txn (exclussive).
- * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList.
- * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved
- * transactions. Any txn above {@code highWatermark} is unresolved.
- * These produce the logic we need to assure that the compactor only sees records less than the lowest
- * open transaction when choosing which files to compact, but that it still ignores aborted
- * records when compacting.
- *
- * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper
- * way to construct this.
- */
-public class ValidCompactorTxnList extends ValidReadTxnList {
- public ValidCompactorTxnList() {
- super();
- }
- public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark) {
- this(abortedTxnList, abortedBits, highWatermark, Long.MAX_VALUE);
- }
- /**
- * @param abortedTxnList list of all aborted transactions
- * @param abortedBits bitset marking whether the corresponding transaction is aborted
- * @param highWatermark highest committed transaction to be considered for compaction,
- * equivalently (lowest_open_txn - 1).
- */
- public ValidCompactorTxnList(long[] abortedTxnList, BitSet abortedBits, long highWatermark, long minOpenTxnId) {
- // abortedBits should be all true as everything in exceptions are aborted txns
- super(abortedTxnList, abortedBits, highWatermark, minOpenTxnId);
- if(this.exceptions.length <= 0) {
- return;
- }
- //now that exceptions (aka abortedTxnList) is sorted
- int idx = Arrays.binarySearch(this.exceptions, highWatermark);
- int lastElementPos;
- if(idx < 0) {
- int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
- lastElementPos = insertionPoint - 1;
- }
- else {
- lastElementPos = idx;
- }
- /*
- * ensure that we throw out any exceptions above highWatermark to make
- * {@link #isTxnValid(long)} faster
- */
- this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
- }
- public ValidCompactorTxnList(String value) {
- super(value);
- }
- /**
- * Returns org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL if all txns in
- * the range are resolved and RangeResponse.NONE otherwise
- */
- @Override
- public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
- return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
- }
-
- @Override
- public boolean isTxnAborted(long txnid) {
- return Arrays.binarySearch(exceptions, txnid) >= 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
new file mode 100644
index 0000000..9f6cf47
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * An implementation of {@link ValidWriteIdList} for use by the compactor.
+ *
+ * Compaction should only include txns up to smallest open txn (exclussive).
+ * There may be aborted write ids in the snapshot represented by this ValidCompactorWriteIdList.
+ * Thus {@link #isWriteIdRangeValid(long, long)} returns NONE for any range that includes any unresolved
+ * write ids. Any write id above {@code highWatermark} is unresolved.
+ * These produce the logic we need to assure that the compactor only sees records less than the lowest
+ * open write ids when choosing which files to compact, but that it still ignores aborted
+ * records when compacting.
+ *
+ * See org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList() for proper
+ * way to construct this.
+ */
+public class ValidCompactorWriteIdList extends ValidReaderWriteIdList {
+ public ValidCompactorWriteIdList() {
+ super();
+ }
+ public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits,
+ long highWatermark) {
+ this(tableName, abortedWriteIdList, abortedBits, highWatermark, Long.MAX_VALUE);
+ }
+ /**
+ * @param tableName table which is under compaction. Full name of format <db_name>.<table_name>
+ * @param abortedWriteIdList list of all aborted write ids
+ * @param abortedBits bitset marking whether the corresponding transaction is aborted
+ * @param highWatermark highest committed write id to be considered for compaction,
+ * equivalently (lowest_open_write_id - 1).
+ * @param minOpenWriteId minimum write ID which maps to a open transaction
+ */
+ public ValidCompactorWriteIdList(String tableName, long[] abortedWriteIdList, BitSet abortedBits,
+ long highWatermark, long minOpenWriteId) {
+ // abortedBits should be all true as everything in exceptions are aborted txns
+ super(tableName, abortedWriteIdList, abortedBits, highWatermark, minOpenWriteId);
+ if(this.exceptions.length <= 0) {
+ return;
+ }
+ //now that exceptions (aka abortedTxnList) is sorted
+ int idx = Arrays.binarySearch(this.exceptions, highWatermark);
+ int lastElementPos;
+ if(idx < 0) {
+ int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc
+ lastElementPos = insertionPoint - 1;
+ }
+ else {
+ lastElementPos = idx;
+ }
+ /*
+ * ensure that we throw out any exceptions above highWatermark to make
+ * {@link #isWriteIdValid(long)} faster
+ */
+ this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1);
+ }
+ public ValidCompactorWriteIdList(String value) {
+ super(value);
+ }
+ /**
+ * Returns org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse.ALL if all write ids in
+ * the range are resolved and RangeResponse.NONE otherwise
+ */
+ @Override
+ public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {
+ return highWatermark >= maxWriteId ? RangeResponse.ALL : RangeResponse.NONE;
+ }
+
+ @Override
+ public boolean isWriteIdAborted(long writeId) {
+ return Arrays.binarySearch(exceptions, writeId) >= 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index ccdd4b7..dd432d9 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -59,24 +59,16 @@ public class ValidReadTxnList implements ValidTxnList {
@Override
public boolean isTxnValid(long txnid) {
- if (highWatermark < txnid) {
+ if (txnid > highWatermark) {
return false;
}
return Arrays.binarySearch(exceptions, txnid) < 0;
}
- /**
- * We cannot use a base file if its range contains an open txn.
- * @param txnid from base_xxxx
- */
- @Override
- public boolean isValidBase(long txnid) {
- return minOpenTxn > txnid && txnid <= highWatermark;
- }
@Override
public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
// check the easy cases first
- if (highWatermark < minTxnId) {
+ if (minTxnId > highWatermark) {
return RangeResponse.NONE;
} else if (exceptions.length > 0 && exceptions[0] > maxTxnId) {
return RangeResponse.ALL;