You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2015/12/14 20:48:22 UTC

hive git commit: HIVE-12620 Misc improvement to Acid module(Eugene Koifman, reviewed by Wei Zheng, Jason Dere)

Repository: hive
Updated Branches:
  refs/heads/master 69f03b86c -> 49dc6452a


HIVE-12620 Misc improvement to Acid module(Eugene Koifman, reviewed by Wei Zheng, Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/49dc6452
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/49dc6452
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/49dc6452

Branch: refs/heads/master
Commit: 49dc6452acf022e463318b0042f7ad918927bc5e
Parents: 69f03b8
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon Dec 14 11:46:06 2015 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon Dec 14 11:46:20 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 60 ++++++++++++++++----
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   | 47 ++++++++++++++-
 .../hive/ql/lockmgr/TestDbTxnManager.java       | 41 ++++++++++---
 3 files changed, 126 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 8ff2195..50d8892 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -529,8 +529,7 @@ public class TxnHandler {
         else {
           heartbeatLock(dbConn, extLockId);
         }
-        closeDbConn(dbConn);
-        dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -1099,6 +1098,10 @@ public class TxnHandler {
           LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
         }
       }
+      else {
+        //make sure we know we saw an error that we don't recognize
+        LOG.info("Non-retryable error: " + getMessage(e));
+      }
     }
     finally {
       /*if this method ends with anything except a retry signal, the caller should fail the operation
@@ -1577,7 +1580,7 @@ public class TxnHandler {
         return checkLock(dbConn, extLockId);
       } catch (NoSuchLockException e) {
         // This should never happen, as we just added the lock id
-        throw new MetaException("Couldn't find a lock we just created!");
+        throw new MetaException("Couldn't find a lock we just created! " + e.getMessage());
       } finally {
         close(rs);
         closeStmt(stmt);
@@ -1706,7 +1709,7 @@ public class TxnHandler {
         if (index == -1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
-          throw new MetaException("How did we get here, we heartbeated our lock before we started!");
+          throw new MetaException("How did we get here, we heartbeated our lock before we started! ( " + info + ")");
         }
 
 
@@ -1972,17 +1975,50 @@ public class TxnHandler {
   // open transactions.
   private void timeOutLocks(Connection dbConn, long now) {
     Statement stmt = null;
+    ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
-      // Remove any timed out locks from the table.
-      String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " +
-        (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is
+      long maxHeartbeatTime = now - timeout;
+      //doing a SELECT first is less efficient but makes it easier to debug things
+      String s = "select distinct hl_lock_ext_id from HIVE_LOCKS where hl_last_heartbeat < " +
+        maxHeartbeatTime + " and hl_txnid = 0";//when txnid is <> 0, the lock is
       //associated with a txn and is handled by performTimeOuts()
       //want to avoid expiring locks for a txn w/o expiring the txn itself
-      LOG.debug("Going to execute update <" + s + ">");
-      int deletedLocks = stmt.executeUpdate(s);
+      List<Long> extLockIDs = new ArrayList<>();
+      rs = stmt.executeQuery(s);
+      while(rs.next()) {
+        extLockIDs.add(rs.getLong(1));
+      }
+      rs.close();
+      dbConn.commit();
+      if(extLockIDs.size() <= 0) {
+        return;
+      }
+      int deletedLocks = 0;
+      //include same hl_last_heartbeat condition in case someone heartbeated since the select
+      s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + maxHeartbeatTime + " and hl_txnid = 0" +
+        " and hl_lock_ext_id IN (";
+      int numWholeBatches = extLockIDs.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE;
+      for(int i = 0; i < numWholeBatches; i++) {
+        StringBuilder sb = new StringBuilder(s);
+        for(int j = i * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j < (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE; j++) {
+          sb.append(extLockIDs.get(j)).append(",");
+        }
+        sb.setCharAt(sb.length() - 1, ')');
+        LOG.debug("Removing expired locks via: " + sb.toString());
+        deletedLocks += stmt.executeUpdate(sb.toString());
+        dbConn.commit();
+      }
+      StringBuilder sb = new StringBuilder(s);
+      for(int i = numWholeBatches * TIMED_OUT_TXN_ABORT_BATCH_SIZE; i < extLockIDs.size(); i++) {
+        sb.append(extLockIDs.get(i)).append(",");
+      }
+      sb.setCharAt(sb.length() - 1, ')');
+      LOG.debug("Removing expired locks via: " + sb.toString());
+      deletedLocks += stmt.executeUpdate(sb.toString());
       if(deletedLocks > 0) {
-        LOG.info("Deleted " + deletedLocks + " locks from HIVE_LOCKS due to timeout");
+        LOG.info("Deleted " + deletedLocks + " ext locks from HIVE_LOCKS due to timeout (vs. " +
+          extLockIDs.size() + " found. List: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime);
       }
       LOG.debug("Going to commit");
       dbConn.commit();
@@ -1993,6 +2029,7 @@ public class TxnHandler {
     catch(Exception ex) {
       LOG.error("Failed to purge timedout locks due to: " + ex.getMessage(), ex);
     } finally {
+      close(rs);
       closeStmt(stmt);
     }
   }
@@ -2265,7 +2302,8 @@ public class TxnHandler {
         //in MSSQL this means Communication Link Failure
         return true;
       }
-      if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState())) {
+      if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
+        sqlException.getMessage().contains("consistent read failure; rollback data not available")) {
         return true;
       }
       //see also https://issues.apache.org/jira/browse/HIVE-9938

http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 7d58622..7fa57d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -52,6 +52,7 @@ public class DbLockManager implements HiveLockManager{
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   private long MAX_SLEEP;
+  //longer term we should always have a txn id and then we won't need to track locks here at all
   private Set<DbHiveLock> locks;
   private IMetaStoreClient client;
   private long nextSleep = 50;
@@ -115,7 +116,27 @@ public class DbLockManager implements HiveLockManager{
 
       }
       long retryDuration = System.currentTimeMillis() - startRetry;
-      DbHiveLock hl = new DbHiveLock(res.getLockid());
+      DbHiveLock hl = new DbHiveLock(res.getLockid(), queryId, lock.getTxnid());
+      if(locks.size() > 0) {
+        boolean logMsg = false;
+        for(DbHiveLock l : locks) {
+          if(l.txnId != hl.txnId) {
+            //locks from different transactions detected (or from transaction and read-only query in autocommit)
+            logMsg = true;
+            break;
+          }
+          else if(l.txnId == 0) {
+            if(!l.queryId.equals(hl.queryId)) {
+              //here means no open transaction, but different queries
+              logMsg = true;
+              break;
+            }
+          }
+        }
+        if(logMsg) {
+          LOG.warn("adding new DbHiveLock(" + hl + ") while we are already tracking locks: " + locks);
+        }
+      }
       locks.add(hl);
       if (res.getState() != LockState.ACQUIRED) {
         if(res.getState() == LockState.WAITING) {
@@ -191,10 +212,12 @@ public class DbLockManager implements HiveLockManager{
   @Override
   public void unlock(HiveLock hiveLock) throws LockException {
     long lockId = ((DbHiveLock)hiveLock).lockId;
+    boolean removed = false;
     try {
       LOG.debug("Unlocking " + hiveLock);
       client.unlock(lockId);
-      boolean removed = locks.remove(hiveLock);
+      //important to remove after unlock() in case it fails
+      removed = locks.remove(hiveLock);
       Metrics metrics = MetricsFactory.getInstance();
       if (metrics != null) {
         try {
@@ -205,6 +228,9 @@ public class DbLockManager implements HiveLockManager{
       }
       LOG.debug("Removed a lock " + removed);
     } catch (NoSuchLockException e) {
+      //if metastore has no record of this lock, it most likely timed out; either way
+      //there is no point tracking it here any longer
+      removed = locks.remove(hiveLock);
       LOG.error("Metastore could find no record of lock " + JavaUtils.lockIdToString(lockId));
       throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
     } catch (TxnOpenException e) {
@@ -214,10 +240,16 @@ public class DbLockManager implements HiveLockManager{
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
     }
+    finally {
+      if(removed) {
+        LOG.debug("Removed a lock " + hiveLock);
+      }
+    }
   }
 
   @Override
   public void releaseLocks(List<HiveLock> hiveLocks) {
+    LOG.info("releaseLocks: " + hiveLocks);
     for (HiveLock lock : hiveLocks) {
       try {
         unlock(lock);
@@ -225,6 +257,8 @@ public class DbLockManager implements HiveLockManager{
         // Not sure why this method doesn't throw any exceptions,
         // but since the interface doesn't allow it we'll just swallow them and
         // move on.
+        //This OK-ish since releaseLocks() is only called for RO/AC queries; it
+        //would be really bad to eat exceptions here for write operations
       }
     }
   }
@@ -271,10 +305,17 @@ public class DbLockManager implements HiveLockManager{
   static class DbHiveLock extends HiveLock {
 
     long lockId;
+    String queryId;
+    long txnId;
 
     DbHiveLock(long id) {
       lockId = id;
     }
+    DbHiveLock(long id, String queryId, long txnId) {
+      lockId = id;
+      this.queryId = queryId;
+      this.txnId = txnId;
+    }
 
     @Override
     public HiveLockObject getHiveLockObject() {
@@ -301,7 +342,7 @@ public class DbLockManager implements HiveLockManager{
     }
     @Override
     public String toString() {
-      return JavaUtils.lockIdToString(lockId);
+      return JavaUtils.lockIdToString(lockId) + " queryId=" + queryId + " " + JavaUtils.txnIdToString(txnId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/49dc6452/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index b7d1d18..f82b85a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -198,9 +199,8 @@ public class TestDbTxnManager {
   }
   @Test
   public void testExceptions() throws Exception {
-    WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
+    addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.acquireLocks(qp, ctx, "PeterI");
     txnMgr.openTxn("NicholasII");
     runReaper();
     LockException exception = null;
@@ -240,6 +240,32 @@ public class TestDbTxnManager {
   }
 
   @Test
+  public void testLockTimeout() throws Exception {
+    addPartitionInput(newTable(true));
+    QueryPlan qp = new MockQueryPlan(this);
+    //make sure it works with nothing to expire
+    expireLocks(txnMgr, 0);
+    //create a few read locks, all on the same resource
+    for(int i = 0; i < 5; i++) {
+      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+    }
+    expireLocks(txnMgr, 5);
+    //create a lot of locks
+    for(int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
+      txnMgr.acquireLocks(qp, ctx, "PeterI" + i);
+    }
+    expireLocks(txnMgr, TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+  }
+  private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
+    DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
+    ShowLocksResponse resp = lockManager.getLocks();
+    Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
+    runReaper();
+    resp = lockManager.getLocks();
+    Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+  }
+
+  @Test
   public void testReadWrite() throws Exception {
     Table t = newTable(true);
     addPartitionInput(t);
@@ -359,6 +385,7 @@ public class TestDbTxnManager {
   public void setUp() throws Exception {
     TxnDbUtil.prepDb();
     txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr.getLockManager();//init lock manager
     Assert.assertTrue(txnMgr instanceof DbTxnManager);
     nextInput = 1;
     readEntities = new HashSet<ReadEntity>();
@@ -376,15 +403,13 @@ public class TestDbTxnManager {
   }
 
   private static class MockQueryPlan extends QueryPlan {
-    private final HashSet<ReadEntity> inputs;
-    private final HashSet<WriteEntity> outputs;
+    private final HashSet<ReadEntity> inputs = new HashSet<>();
+    private final HashSet<WriteEntity> outputs = new HashSet<>();
     private final String queryId;
     
     MockQueryPlan(TestDbTxnManager test) {
-      HashSet<ReadEntity> r = test.readEntities;
-      HashSet<WriteEntity> w = test.writeEntities;
-      inputs = (r == null) ? new HashSet<ReadEntity>() : r;
-      outputs = (w == null) ? new HashSet<WriteEntity>() : w;
+      inputs.addAll(test.readEntities);
+      outputs.addAll(test.writeEntities);
       queryId = makeQueryId();
     }