You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/23 15:09:05 UTC
[hive] branch master updated: HIVE-23047: Calculate the epoch on DB
side (Peter Vary reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1cc00b1 HIVE-23047: Calculate the epoch on DB side (Peter Vary reviewed by Denys Kuzmenko)
1cc00b1 is described below
commit 1cc00b190a5a53deac2d433e15da25aa4d5d1b42
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Mon Mar 23 16:08:35 2020 +0100
HIVE-23047: Calculate the epoch on DB side (Peter Vary reviewed by Denys Kuzmenko)
---
.../hadoop/hive/metastore/txn/TxnHandler.java | 103 +++++++++++++--------
1 file changed, 63 insertions(+), 40 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 347cb6e..06defdb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Savepoint;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -603,7 +604,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
- long now = getDbTime(dbConn);
List<Long> txnIds = new ArrayList<>(numTxns);
List<String> rows = new ArrayList<>();
@@ -611,9 +611,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
params.add(rqst.getUser());
params.add(rqst.getHostname());
List<List<String>> paramsList = new ArrayList<>(numTxns);
+ String dbEpochString = getDbEpochString();
for (long i = first; i < first + numTxns; i++) {
txnIds.add(i);
- rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue());
+ rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?,"
+ + txnType.getValue());
paramsList.add(params);
}
insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
@@ -2521,7 +2523,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<String> rows = new ArrayList<>();
List<List<String>> paramsList = new ArrayList<>();
long intLockId = 0;
- long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn));
+ String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString());
for (LockComponent lc : rqst.getComponent()) {
if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
(MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
@@ -2554,9 +2556,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
((tblName == null) ? "null" : "?") + ", " +
((partName == null) ? "null" : "?") + ", " +
- quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
- //for locks associated with a txn, we always heartbeat txn and timeout based on that
- lastHB + ", " +
+ quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
+ //for locks associated with a txn, we always heartbeat txn and timeout based on that
+ lastHBString + ", " +
((rqst.getUser() == null) ? "null" : "?") + ", " +
((rqst.getHostname() == null) ? "null" : "?") + ", " +
((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
@@ -2959,7 +2961,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
txnIds.add(txn);
}
TxnUtils.buildQueryWithINClause(conf, queries,
- new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbTime(dbConn) +
+ new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() +
" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "),
new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false);
int updateCnt = 0;
@@ -3924,6 +3926,37 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
/**
+ * Returns the database specific query string representation which will return the milliseconds
+ * value after epoch.
+ * @return The string which will insert the current timestamp milliseconds value
+ * @throws MetaException For unknown database type
+ */
+ private static String epochInCurrentTimezone = null;
+ protected String getDbEpochString() throws MetaException {
+ switch (dbProduct) {
+ case DERBY:
+ if (epochInCurrentTimezone == null) {
+ epochInCurrentTimezone = new Timestamp(0).toString();
+ }
+ return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + epochInCurrentTimezone +
+ "'), current_timestamp) } / 1000000";
+ case MYSQL:
+ return "round(unix_timestamp(curtime(4)) * 1000)";
+ case POSTGRES:
+ return "round(extract(epoch from current_timestamp) * 1000)";
+ case ORACLE:
+ return "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " +
+ "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)";
+ case SQLSERVER:
+ return "datediff_big(millisecond, '19700101', sysutcdatetime())";
+ default:
+ String msg = "Unknown database product: " + dbProduct.toString();
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
+
+ /**
* Determine the current time, using the RDBMS as a source of truth
* @param conn database connection
* @return current time in milliseconds
@@ -4204,8 +4237,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
- private int abortTxns(Connection dbConn, List<Long> txnids, boolean isStrict) throws SQLException {
- return abortTxns(dbConn, txnids, -1, isStrict);
+ private int abortTxns(Connection dbConn, List<Long> txnids, boolean isStrict) throws SQLException, MetaException {
+ return abortTxns(dbConn, txnids, false, isStrict);
}
/**
* TODO: expose this as an operation to client. Useful for streaming API to abort all remaining
@@ -4215,8 +4248,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*
* @param dbConn An active connection
* @param txnids list of transactions to abort
- * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
- * hearbetated after #performTimeOuts() select and this operation.
+ * @param checkHeartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
+ * heartbeated after #performTimeOuts() select and this operation.
* @param isStrict true for strict mode, false for best-effort mode.
* In strict mode, if all txns are not successfully aborted, then the count of
* updated ones will be returned and the caller will roll back.
@@ -4224,8 +4257,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* @return Number of aborted transactions
* @throws SQLException
*/
- private int abortTxns(Connection dbConn, List<Long> txnids, long max_heartbeat, boolean isStrict)
- throws SQLException {
+ private int abortTxns(Connection dbConn, List<Long> txnids, boolean checkHeartbeat, boolean isStrict)
+ throws SQLException, MetaException {
Statement stmt = null;
int updateCnt = 0;
if (txnids.isEmpty()) {
@@ -4242,10 +4275,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND ");
- if(max_heartbeat > 0) {
- suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(max_heartbeat);
- } else {
- suffix.append("");
+ if(checkHeartbeat) {
+ suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(getDbEpochString()).append("-").append(timeout);
}
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
@@ -4264,7 +4295,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
suffix.setLength(0);
prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
- suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
@@ -4290,7 +4320,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
suffix.setLength(0);
prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
- suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
@@ -4493,11 +4522,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
long txnId = locksBeingChecked.get(0).txnId;
long extLockId = locksBeingChecked.get(0).extLockId;
- long now = getDbTime(dbConn);
String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " +
//if lock is part of txn, heartbeat info is in txn record
- "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : now) +
- ", \"HL_ACQUIRED_AT\" = " + now + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
+ "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : getDbEpochString()) +
+ ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
@@ -4540,10 +4568,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Statement stmt = null;
try {
stmt = dbConn.createStatement();
- long now = getDbTime(dbConn);
String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " +
- now + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
+ getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
@@ -4566,8 +4593,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Statement stmt = null;
try {
stmt = dbConn.createStatement();
- long now = getDbTime(dbConn);
- String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + now +
+ String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() +
" WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'";
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
@@ -4816,15 +4842,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// for read-only autoCommit=true statements. This does a commit,
// and thus should be done before any calls to heartbeat that will leave
// open transactions.
- private void timeOutLocks(Connection dbConn, long now) {
+ private void timeOutLocks(Connection dbConn) {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = dbConn.createStatement();
- long maxHeartbeatTime = now - timeout;
//doing a SELECT first is less efficient but makes it easier to debug things
String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " +
- maxHeartbeatTime + " AND \"HL_TXNID\" = 0";//when txnid is <> 0, the lock is
+ getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0"; //when txnid is <> 0, the lock is
//associated with a txn and is handled by performTimeOuts()
//want to avoid expiring locks for a txn w/o expiring the txn itself
List<Long> extLockIDs = new ArrayList<>();
@@ -4845,9 +4870,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//include same hl_last_heartbeat condition in case someone heartbeated since the select
prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < ");
- prefix.append(maxHeartbeatTime);
+ prefix.append(getDbEpochString()).append("-").append(timeout);
prefix.append(" AND \"HL_TXNID\" = 0 AND ");
- suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false);
@@ -4859,7 +4883,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if(deletedLocks > 0) {
Collections.sort(extLockIDs);//easier to read logs
LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due to timeout (" +
- "HL_LOCK_EXT_ID list: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime);
+ "HL_LOCK_EXT_ID list: " + extLockIDs + ")");
}
LOG.debug("Going to commit");
dbConn.commit();
@@ -4899,12 +4923,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//is made, in which case heartbeat will succeed but txn will still be Aborted.
//Solving this corner case is not worth the perf penalty. The client should heartbeat in a
//timely way.
- long now = getDbTime(dbConn);
- timeOutLocks(dbConn, now);
+ timeOutLocks(dbConn);
while(true) {
stmt = dbConn.createStatement();
String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN +
- "' AND \"TXN_LAST_HEARTBEAT\" < " + (now - timeout) + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
+ "' AND \"TXN_LAST_HEARTBEAT\" < " + getDbEpochString() + "-" + timeout +
+ " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
//safety valve for extreme cases
s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
LOG.debug("Going to execute query <" + s + ">");
@@ -4926,7 +4950,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
close(rs, stmt, null);
int numTxnsAborted = 0;
for(List<Long> batchToAbort : timedOutTxns) {
- if(abortTxns(dbConn, batchToAbort, now - timeout, true) == batchToAbort.size()) {
+ if(abortTxns(dbConn, batchToAbort, true, true) == batchToAbort.size()) {
dbConn.commit();
numTxnsAborted += batchToAbort.size();
//todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
@@ -4945,10 +4969,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout");
}
} catch (SQLException ex) {
- LOG.warn("Aborting timedout transactions failed due to " + getMessage(ex), ex);
- }
- catch(MetaException e) {
- LOG.warn("Aborting timedout transactions failed due to " + e.getMessage(), e);
+ LOG.warn("Aborting timed out transactions failed due to " + getMessage(ex), ex);
+ } catch(MetaException e) {
+ LOG.warn("Aborting timed out transactions failed due to " + e.getMessage(), e);
}
finally {
close(rs, stmt, dbConn);