You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/23 15:09:05 UTC

[hive] branch master updated: HIVE-23047: Calculate the epoch on DB side (Peter Vary reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cc00b1  HIVE-23047: Calculate the epoch on DB side (Peter Vary reviewed by Denys Kuzmenko)
1cc00b1 is described below

commit 1cc00b190a5a53deac2d433e15da25aa4d5d1b42
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Mon Mar 23 16:08:35 2020 +0100

    HIVE-23047: Calculate the epoch on DB side (Peter Vary reviewed by Denys Kuzmenko)
---
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 103 +++++++++++++--------
 1 file changed, 63 insertions(+), 40 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 347cb6e..06defdb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Savepoint;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -603,7 +604,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       LOG.debug("Going to execute update <" + s + ">");
       stmt.executeUpdate(s);
 
-      long now = getDbTime(dbConn);
       List<Long> txnIds = new ArrayList<>(numTxns);
 
       List<String> rows = new ArrayList<>();
@@ -611,9 +611,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       params.add(rqst.getUser());
       params.add(rqst.getHostname());
       List<List<String>> paramsList = new ArrayList<>(numTxns);
+      String dbEpochString = getDbEpochString();
       for (long i = first; i < first + numTxns; i++) {
         txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue());
+        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?,"
+                     + txnType.getValue());
         paramsList.add(params);
       }
       insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
@@ -2521,7 +2523,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         List<String> rows = new ArrayList<>();
         List<List<String>> paramsList = new ArrayList<>();
         long intLockId = 0;
-        long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn));
+        String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString());
         for (LockComponent lc : rqst.getComponent()) {
           if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
             (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) {
@@ -2554,9 +2556,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
                   ((tblName == null) ? "null" : "?") + ", " +
                   ((partName == null) ? "null" : "?") + ", " +
-            quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
-            //for locks associated with a txn, we always heartbeat txn and timeout based on that
-            lastHB + ", " +
+                  quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
+                  //for locks associated with a txn, we always heartbeat txn and timeout based on that
+                  lastHBString + ", " +
                   ((rqst.getUser() == null) ? "null" : "?")  + ", " +
                   ((rqst.getHostname() == null) ? "null" : "?") + ", " +
                   ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
@@ -2959,7 +2961,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           txnIds.add(txn);
         }
         TxnUtils.buildQueryWithINClause(conf, queries,
-          new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbTime(dbConn) +
+          new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() +
             " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "),
           new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false);
         int updateCnt = 0;
@@ -3924,6 +3926,37 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   /**
+   * Returns the database specific query string representation which will return the milliseconds
+   * value after epoch.
+   * @return The string which will insert the current timestamp milliseconds value
+   * @throws MetaException For unknown database type
+   */
+  private static String epochInCurrentTimezone = null;
+  protected String getDbEpochString() throws MetaException {
+    switch (dbProduct) {
+      case DERBY:
+        if (epochInCurrentTimezone == null) {
+          epochInCurrentTimezone = new Timestamp(0).toString();
+        }
+        return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + epochInCurrentTimezone +
+          "'), current_timestamp) } / 1000000";
+      case MYSQL:
+        return "round(unix_timestamp(curtime(4)) * 1000)";
+      case POSTGRES:
+        return "round(extract(epoch from current_timestamp) * 1000)";
+      case ORACLE:
+        return "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " +
+          "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)";
+      case SQLSERVER:
+        return "datediff_big(millisecond, '19700101', sysutcdatetime())";
+      default:
+        String msg = "Unknown database product: " + dbProduct.toString();
+        LOG.error(msg);
+        throw new MetaException(msg);
+    }
+  }
+
+  /**
    * Determine the current time, using the RDBMS as a source of truth
    * @param conn database connection
    * @return current time in milliseconds
@@ -4204,8 +4237,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private int abortTxns(Connection dbConn, List<Long> txnids, boolean isStrict) throws SQLException {
-    return abortTxns(dbConn, txnids, -1, isStrict);
+  private int abortTxns(Connection dbConn, List<Long> txnids, boolean isStrict) throws SQLException, MetaException {
+    return abortTxns(dbConn, txnids, false, isStrict);
   }
   /**
    * TODO: expose this as an operation to client.  Useful for streaming API to abort all remaining
@@ -4215,8 +4248,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    *
    * @param dbConn An active connection
    * @param txnids list of transactions to abort
-   * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
-   *                      hearbetated after #performTimeOuts() select and this operation.
+   * @param checkHeartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were
+   *                      heartbeated after #performTimeOuts() select and this operation.
    * @param isStrict true for strict mode, false for best-effort mode.
    *                 In strict mode, if all txns are not successfully aborted, then the count of
    *                 updated ones will be returned and the caller will roll back.
@@ -4224,8 +4257,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    * @return Number of aborted transactions
    * @throws SQLException
    */
-  private int abortTxns(Connection dbConn, List<Long> txnids, long max_heartbeat, boolean isStrict)
-      throws SQLException {
+  private int abortTxns(Connection dbConn, List<Long> txnids, boolean checkHeartbeat, boolean isStrict)
+      throws SQLException, MetaException {
     Statement stmt = null;
     int updateCnt = 0;
     if (txnids.isEmpty()) {
@@ -4242,10 +4275,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
       prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
         " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND ");
-      if(max_heartbeat > 0) {
-        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(max_heartbeat);
-      } else {
-        suffix.append("");
+      if(checkHeartbeat) {
+        suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(getDbEpochString()).append("-").append(timeout);
       }
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
@@ -4264,7 +4295,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       suffix.setLength(0);
 
       prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
-      suffix.append("");
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
 
@@ -4290,7 +4320,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       suffix.setLength(0);
 
       prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
-      suffix.append("");
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
 
@@ -4493,11 +4522,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
     long txnId = locksBeingChecked.get(0).txnId;
     long extLockId = locksBeingChecked.get(0).extLockId;
-    long now = getDbTime(dbConn);
     String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " +
       //if lock is part of txn, heartbeat info is in txn record
-      "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : now) +
-      ", \"HL_ACQUIRED_AT\" = " + now + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
+      "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : getDbEpochString()) +
+      ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" +
       " WHERE \"HL_LOCK_EXT_ID\" = " +  extLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
@@ -4540,10 +4568,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Statement stmt = null;
     try {
       stmt = dbConn.createStatement();
-      long now = getDbTime(dbConn);
 
       String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " +
-        now + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
+        getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
       if (rc < 1) {
@@ -4566,8 +4593,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Statement stmt = null;
     try {
       stmt = dbConn.createStatement();
-      long now = getDbTime(dbConn);
-      String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + now +
+      String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() +
         " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'";
       LOG.debug("Going to execute update <" + s + ">");
       int rc = stmt.executeUpdate(s);
@@ -4816,15 +4842,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   // for read-only autoCommit=true statements.  This does a commit,
   // and thus should be done before any calls to heartbeat that will leave
   // open transactions.
-  private void timeOutLocks(Connection dbConn, long now) {
+  private void timeOutLocks(Connection dbConn) {
     Statement stmt = null;
     ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
-      long maxHeartbeatTime = now - timeout;
       //doing a SELECT first is less efficient but makes it easier to debug things
       String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " +
-        maxHeartbeatTime + " AND \"HL_TXNID\" = 0";//when txnid is <> 0, the lock is
+          getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0"; //when txnid is <> 0, the lock is
       //associated with a txn and is handled by performTimeOuts()
       //want to avoid expiring locks for a txn w/o expiring the txn itself
       List<Long> extLockIDs = new ArrayList<>();
@@ -4845,9 +4870,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
       //include same hl_last_heartbeat condition in case someone heartbeated since the select
       prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < ");
-      prefix.append(maxHeartbeatTime);
+      prefix.append(getDbEpochString()).append("-").append(timeout);
       prefix.append(" AND \"HL_TXNID\" = 0 AND ");
-      suffix.append("");
 
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false);
 
@@ -4859,7 +4883,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       if(deletedLocks > 0) {
         Collections.sort(extLockIDs);//easier to read logs
         LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due to timeout (" +
-          "HL_LOCK_EXT_ID list:  " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime);
+          "HL_LOCK_EXT_ID list:  " + extLockIDs + ")");
       }
       LOG.debug("Going to commit");
       dbConn.commit();
@@ -4899,12 +4923,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       //is made, in which case heartbeat will succeed but txn will still be Aborted.
       //Solving this corner case is not worth the perf penalty.  The client should heartbeat in a
       //timely way.
-      long now = getDbTime(dbConn);
-      timeOutLocks(dbConn, now);
+      timeOutLocks(dbConn);
       while(true) {
         stmt = dbConn.createStatement();
         String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN +
-            "' AND \"TXN_LAST_HEARTBEAT\" <  " + (now - timeout) + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
+            "' AND \"TXN_LAST_HEARTBEAT\" <  " + getDbEpochString() + "-" + timeout +
+            " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
         //safety valve for extreme cases
         s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
         LOG.debug("Going to execute query <" + s + ">");
@@ -4926,7 +4950,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         close(rs, stmt, null);
         int numTxnsAborted = 0;
         for(List<Long> batchToAbort : timedOutTxns) {
-          if(abortTxns(dbConn, batchToAbort, now - timeout, true) == batchToAbort.size()) {
+          if(abortTxns(dbConn, batchToAbort, true, true) == batchToAbort.size()) {
             dbConn.commit();
             numTxnsAborted += batchToAbort.size();
             //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'
@@ -4945,10 +4969,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout");
       }
     } catch (SQLException ex) {
-      LOG.warn("Aborting timedout transactions failed due to " + getMessage(ex), ex);
-    }
-    catch(MetaException e) {
-      LOG.warn("Aborting timedout transactions failed due to " + e.getMessage(), e);
+      LOG.warn("Aborting timed out transactions failed due to " + getMessage(ex), ex);
+    } catch(MetaException e) {
+      LOG.warn("Aborting timed out transactions failed due to " + e.getMessage(), e);
     }
     finally {
       close(rs, stmt, dbConn);