You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/07/02 01:42:46 UTC

hive git commit: HIVE-13725: ACID: Streaming API should synchronize calls when multiple threads use the same endpoint (Eugene Koifman, Vaibhav Gumashta reveiwed by Eugene Koifman

Repository: hive
Updated Branches:
  refs/heads/branch-2.1 21da55964 -> c2e9c59b4


HIVE-13725: ACID: Streaming API should synchronize calls when multiple threads use the same endpoint (Eugene Koifman, Vaibhav Gumashta reveiwed by Eugene Koifman


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c2e9c59b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c2e9c59b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c2e9c59b

Branch: refs/heads/branch-2.1
Commit: c2e9c59b4329059cc1fff757394aac911adef51c
Parents: 21da559
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Sun Jun 26 23:23:46 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Fri Jul 1 18:37:13 2016 -0700

----------------------------------------------------------------------
 .../hive/hcatalog/streaming/HiveEndPoint.java   |  24 +++--
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |  11 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 102 ++++++++++++++++---
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  52 +++++++++-
 4 files changed, 160 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 017f565..452cb15 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -277,6 +277,7 @@ public class HiveEndPoint {
 
   private static class ConnectionImpl implements StreamingConnection {
     private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
     private final HiveEndPoint endPt;
     private final UserGroupInformation ugi;
     private final String username;
@@ -309,6 +310,9 @@ public class HiveEndPoint {
       }
       this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
       this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+      // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+      // isolated from the other transaction related RPC calls.
+      this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
       checkEndPoint(endPoint, msClient);
       if (createPart  &&  !endPoint.partitionVals.isEmpty()) {
         createPartitionIfNotExists(endPoint, msClient, conf);
@@ -366,6 +370,7 @@ public class HiveEndPoint {
     public void close() {
       if (ugi==null) {
         msClient.close();
+        heartbeaterMSClient.close();
         return;
       }
       try {
@@ -374,6 +379,7 @@ public class HiveEndPoint {
               @Override
               public Void run() throws Exception {
                 msClient.close();
+                heartbeaterMSClient.close();
                 return null;
               }
             } );
@@ -429,8 +435,8 @@ public class HiveEndPoint {
     private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
                                                   RecordWriter recordWriter)
             throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
-      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient
-              , recordWriter, agentInfo);
+      return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
+          heartbeaterMSClient, recordWriter, agentInfo);
     }
 
 
@@ -541,14 +547,14 @@ public class HiveEndPoint {
             + endPoint.metaStoreUri + ". " + e.getMessage(), e);
       }
     }
-
-
   } // class ConnectionImpl
+
   private static class TransactionBatchImpl implements TransactionBatch {
     private final String username;
     private final UserGroupInformation ugi;
     private final HiveEndPoint endPt;
     private final IMetaStoreClient msClient;
+    private final IMetaStoreClient heartbeaterMSClient;
     private final RecordWriter recordWriter;
     private final List<Long> txnIds;
 
@@ -572,9 +578,9 @@ public class HiveEndPoint {
      * @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
      */
     private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
-              final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter, 
-              String agentInfo)
-            throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+        final int numTxns, final IMetaStoreClient msClient,
+        final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
+        throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
       boolean success = false;
       try {
         if ( endPt.partitionVals!=null   &&   !endPt.partitionVals.isEmpty() ) {
@@ -588,6 +594,7 @@ public class HiveEndPoint {
         this.ugi = ugi;
         this.endPt = endPt;
         this.msClient = msClient;
+        this.heartbeaterMSClient = heartbeaterMSClient;
         this.recordWriter = recordWriter;
         this.agentInfo = agentInfo;
 
@@ -937,7 +944,7 @@ public class HiveEndPoint {
       Long first = txnIds.get(currentTxnIndex);
       Long last = txnIds.get(txnIds.size()-1);
       try {
-        HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last);
+        HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
         if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
           throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
         }
@@ -1045,5 +1052,4 @@ public class HiveEndPoint {
     conf.setBoolVar(var, value);
   }
 
-
 }  // class HiveEndPoint

http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 60674eb..f72c379 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -291,6 +291,9 @@ public final class TxnDbUtil {
   }
   @VisibleForTesting
   public static String queryToString(String query) throws Exception {
+    return queryToString(query, true);
+  }
+  public static String queryToString(String query, boolean includeHeader) throws Exception {
     Connection conn = null;
     Statement stmt = null;
     ResultSet rs = null;
@@ -300,10 +303,12 @@ public final class TxnDbUtil {
       stmt = conn.createStatement();
       rs = stmt.executeQuery(query);
       ResultSetMetaData rsmd = rs.getMetaData();
-      for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
-        sb.append(rsmd.getColumnName(colPos)).append("   ");
+      if(includeHeader) {
+        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+          sb.append(rsmd.getColumnName(colPos)).append("   ");
+        }
+        sb.append('\n');
       }
-      sb.append('\n');
       while(rs.next()) {
         for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
           sb.append(rs.getObject(colPos)).append("   ");

http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 5b6f20c..7b0369d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -85,6 +85,32 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   private Runnable shutdownRunner = null;
   private static final int SHUTDOWN_HOOK_PRIORITY = 0;
 
+  // SynchronizedMetaStoreClient object per heartbeater thread.
+  private static ThreadLocal<SynchronizedMetaStoreClient> threadLocalMSClient =
+      new ThreadLocal<SynchronizedMetaStoreClient>() {
+
+        @Override
+        protected SynchronizedMetaStoreClient initialValue() {
+          return null;
+        }
+
+        @Override
+        public synchronized void remove() {
+          SynchronizedMetaStoreClient client = this.get();
+          if (client != null) {
+            client.close();
+          }
+          super.remove();
+        }
+      };
+
+  private static AtomicInteger heartbeaterMSClientCount = new AtomicInteger(0);
+  private int heartbeaterThreadPoolSize = 0;
+
+  private static SynchronizedMetaStoreClient getThreadLocalMSClient() {
+    return threadLocalMSClient.get();
+  }
+
   DbTxnManager() {
     shutdownRunner = new Runnable() {
       @Override
@@ -324,7 +350,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     return t;
   }
   /**
-   * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)}
    * @param delay time to delay for first heartbeat
    */
   @VisibleForTesting
@@ -418,7 +443,29 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     for (HiveLock lock : locks) {
       long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
       try {
-        client.heartbeat(txnId, lockId);
+        // Get the threadlocal metastore client for the heartbeat calls.
+        SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient();
+        if (heartbeaterClient == null) {
+          Hive db;
+          try {
+            db = Hive.get(conf);
+            // Create a new threadlocal synchronized metastore client for use in hearbeater threads.
+            // This makes the concurrent use of heartbeat thread safe, and won't cause transaction
+            // abort due to a long metastore client call blocking the heartbeat call.
+            heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC());
+            threadLocalMSClient.set(heartbeaterClient);
+          } catch (HiveException e) {
+            LOG.error("Unable to create new metastore client for heartbeating", e);
+            throw new LockException(e);
+          }
+          // Increment the threadlocal metastore client count
+          if (heartbeaterMSClientCount.incrementAndGet() >= heartbeaterThreadPoolSize) {
+            LOG.warn("The number of hearbeater metastore clients - + "
+                + heartbeaterMSClientCount.get() + ", has exceeded the max limit - "
+                + heartbeaterThreadPoolSize);
+          }
+        }
+        heartbeaterClient.heartbeat(txnId, lockId);
       } catch (NoSuchLockException e) {
         LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId));
         throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
@@ -445,7 +492,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   private Heartbeater startHeartbeat(long initialDelay) throws LockException {
     long heartbeatInterval = getHeartbeatInterval(conf);
     assert heartbeatInterval > 0;
-    Heartbeater heartbeater = new Heartbeater(this, conf);
+    Heartbeater heartbeater = new Heartbeater(this, conf, queryId);
     // For negative testing purpose..
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
       initialDelay = 0;
@@ -552,24 +599,42 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   private synchronized void initHeartbeatExecutorService() {
-    if (heartbeatExecutorService != null
-        && !heartbeatExecutorService.isShutdown()
+    if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()
         && !heartbeatExecutorService.isTerminated()) {
       return;
     }
-
-    int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
+    heartbeaterThreadPoolSize =
+        conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
     heartbeatExecutorService =
-        Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() {
+        Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() {
           private final AtomicInteger threadCounter = new AtomicInteger();
+
           @Override
           public Thread newThread(Runnable r) {
-            return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement());
+            return new HeartbeaterThread(r, "Heartbeater-" + threadCounter.getAndIncrement());
           }
         });
     ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true);
   }
 
+  public static class HeartbeaterThread extends Thread {
+    public HeartbeaterThread(Runnable target, String name) {
+      super(target, name);
+    }
+
+    @Override
+    /**
+     * We're overriding finalize so that we can do an orderly cleanup of resources held by
+     * the threadlocal metastore client.
+     */
+    protected void finalize() throws Throwable {
+      threadLocalMSClient.remove();
+      // Adjust the metastore client count
+      DbTxnManager.heartbeaterMSClientCount.decrementAndGet();
+      super.finalize();
+    }
+  }
+
   @Override
   public boolean isTxnOpen() {
     return txnId > 0;
@@ -601,20 +666,21 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   public static class Heartbeater implements Runnable {
     private HiveTxnManager txnMgr;
     private HiveConf conf;
-
     LockException lockException;
+    private final String queryId;
+
     public LockException getLockException() {
       return lockException;
     }
-
     /**
      *
      * @param txnMgr transaction manager for this operation
      */
-    Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
+    Heartbeater(HiveTxnManager txnMgr, HiveConf conf, String queryId) {
       this.txnMgr = txnMgr;
       this.conf = conf;
       lockException = null;
+      this.queryId = queryId;
     }
 
     /**
@@ -627,12 +693,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
           throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
         }
-
         LOG.debug("Heartbeating...");
         txnMgr.heartbeat();
       } catch (LockException e) {
-        LOG.error("Failed trying to heartbeat " + e.getMessage());
+        LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + e.getMessage());
         lockException = e;
+      } catch (Throwable t) {
+        LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + t.getMessage(), t);
+        lockException =
+            new LockException("Failed trying to heartbeat queryId=" + queryId + ": "
+                + t.getMessage(), t);
       }
     }
   }
@@ -681,5 +751,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException {
       return client.showLocks(showLocksRequest);
     }
+
+    synchronized void close() {
+      client.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 388b7c5..d995e85 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -21,10 +21,13 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -428,26 +431,69 @@ public class TestTxnCommands {
     Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
     
     //now test that we don't timeout locks we should not
-    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 10, TimeUnit.MINUTES);
+    //heartbeater should be running in the background every 1/2 second
+    hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
+    //hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
     runStatementOnDriver("start transaction");
     runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
+    pause(750);
+    
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    
+    //since there is txn open, we are heartbeating the txn not individual locks
+    GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
+    Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
+    TxnInfo txnInfo = null;
+    for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) {
+      if(ti.getState() == TxnState.OPEN) {
+        txnInfo = ti;
+        break;
+      }
+    }
+    Assert.assertNotNull(txnInfo);
+    Assert.assertEquals(2, txnInfo.getId());
+    Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
+    String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
+    String[] vals = s.split("\\s+");
+    Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
+    long lastHeartbeat = Long.parseLong(vals[1]);
+    //these 2 values are equal when TXN entry is made.  Should never be equal after 1st heartbeat, which we
+    //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
+    Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
+    
     ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
     TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
+    pause(750);
     TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
+    pause(750);
     slr = txnHandler.showLocks(new ShowLocksRequest());
+    Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
     TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
-    Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size());
 
+    pause(750);
     TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
     slr = txnHandler.showLocks(new ShowLocksRequest());
+    Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
     TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
-    Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size());
+
+    //should've done several heartbeats
+    s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
+    vals = s.split("\\s+");
+    Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
+    Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
+       lastHeartbeat < Long.parseLong(vals[1]));
 
     runStatementOnDriver("rollback");
     slr = txnHandler.showLocks(new ShowLocksRequest());
     Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size());
   }
+  private static void pause(int timeMillis) {
+    try {
+      Thread.sleep(timeMillis);
+    }
+    catch (InterruptedException e) {
+    }
+  }
 
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()