You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/07/02 01:42:46 UTC
hive git commit: HIVE-13725: ACID: Streaming API should synchronize
calls when multiple threads use the same endpoint (Eugene Koifman,
Vaibhav Gumashta reveiwed by Eugene Koifman
Repository: hive
Updated Branches:
refs/heads/branch-2.1 21da55964 -> c2e9c59b4
HIVE-13725: ACID: Streaming API should synchronize calls when multiple threads use the same endpoint (Eugene Koifman, Vaibhav Gumashta reveiwed by Eugene Koifman
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c2e9c59b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c2e9c59b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c2e9c59b
Branch: refs/heads/branch-2.1
Commit: c2e9c59b4329059cc1fff757394aac911adef51c
Parents: 21da559
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Sun Jun 26 23:23:46 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Fri Jul 1 18:37:13 2016 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/HiveEndPoint.java | 24 +++--
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 11 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 102 ++++++++++++++++---
.../apache/hadoop/hive/ql/TestTxnCommands.java | 52 +++++++++-
4 files changed, 160 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 017f565..452cb15 100644
--- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -277,6 +277,7 @@ public class HiveEndPoint {
private static class ConnectionImpl implements StreamingConnection {
private final IMetaStoreClient msClient;
+ private final IMetaStoreClient heartbeaterMSClient;
private final HiveEndPoint endPt;
private final UserGroupInformation ugi;
private final String username;
@@ -309,6 +310,9 @@ public class HiveEndPoint {
}
this.secureMode = ugi==null ? false : ugi.hasKerberosCredentials();
this.msClient = getMetaStoreClient(endPoint, conf, secureMode);
+ // We use a separate metastore client for heartbeat calls to ensure heartbeat RPC calls are
+ // isolated from the other transaction related RPC calls.
+ this.heartbeaterMSClient = getMetaStoreClient(endPoint, conf, secureMode);
checkEndPoint(endPoint, msClient);
if (createPart && !endPoint.partitionVals.isEmpty()) {
createPartitionIfNotExists(endPoint, msClient, conf);
@@ -366,6 +370,7 @@ public class HiveEndPoint {
public void close() {
if (ugi==null) {
msClient.close();
+ heartbeaterMSClient.close();
return;
}
try {
@@ -374,6 +379,7 @@ public class HiveEndPoint {
@Override
public Void run() throws Exception {
msClient.close();
+ heartbeaterMSClient.close();
return null;
}
} );
@@ -429,8 +435,8 @@ public class HiveEndPoint {
private TransactionBatch fetchTransactionBatchImpl(int numTransactions,
RecordWriter recordWriter)
throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
- return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient
- , recordWriter, agentInfo);
+ return new TransactionBatchImpl(username, ugi, endPt, numTransactions, msClient,
+ heartbeaterMSClient, recordWriter, agentInfo);
}
@@ -541,14 +547,14 @@ public class HiveEndPoint {
+ endPoint.metaStoreUri + ". " + e.getMessage(), e);
}
}
-
-
} // class ConnectionImpl
+
private static class TransactionBatchImpl implements TransactionBatch {
private final String username;
private final UserGroupInformation ugi;
private final HiveEndPoint endPt;
private final IMetaStoreClient msClient;
+ private final IMetaStoreClient heartbeaterMSClient;
private final RecordWriter recordWriter;
private final List<Long> txnIds;
@@ -572,9 +578,9 @@ public class HiveEndPoint {
* @throws TransactionBatchUnAvailable if failed to acquire a new Transaction batch
*/
private TransactionBatchImpl(final String user, UserGroupInformation ugi, HiveEndPoint endPt,
- final int numTxns, final IMetaStoreClient msClient, RecordWriter recordWriter,
- String agentInfo)
- throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
+ final int numTxns, final IMetaStoreClient msClient,
+ final IMetaStoreClient heartbeaterMSClient, RecordWriter recordWriter, String agentInfo)
+ throws StreamingException, TransactionBatchUnAvailable, InterruptedException {
boolean success = false;
try {
if ( endPt.partitionVals!=null && !endPt.partitionVals.isEmpty() ) {
@@ -588,6 +594,7 @@ public class HiveEndPoint {
this.ugi = ugi;
this.endPt = endPt;
this.msClient = msClient;
+ this.heartbeaterMSClient = heartbeaterMSClient;
this.recordWriter = recordWriter;
this.agentInfo = agentInfo;
@@ -937,7 +944,7 @@ public class HiveEndPoint {
Long first = txnIds.get(currentTxnIndex);
Long last = txnIds.get(txnIds.size()-1);
try {
- HeartbeatTxnRangeResponse resp = msClient.heartbeatTxnRange(first, last);
+ HeartbeatTxnRangeResponse resp = heartbeaterMSClient.heartbeatTxnRange(first, last);
if (!resp.getAborted().isEmpty() || !resp.getNosuch().isEmpty()) {
throw new HeartBeatFailure(resp.getAborted(), resp.getNosuch());
}
@@ -1045,5 +1052,4 @@ public class HiveEndPoint {
conf.setBoolVar(var, value);
}
-
} // class HiveEndPoint
http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 60674eb..f72c379 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -291,6 +291,9 @@ public final class TxnDbUtil {
}
@VisibleForTesting
public static String queryToString(String query) throws Exception {
+ return queryToString(query, true);
+ }
+ public static String queryToString(String query, boolean includeHeader) throws Exception {
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
@@ -300,10 +303,12 @@ public final class TxnDbUtil {
stmt = conn.createStatement();
rs = stmt.executeQuery(query);
ResultSetMetaData rsmd = rs.getMetaData();
- for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
- sb.append(rsmd.getColumnName(colPos)).append(" ");
+ if(includeHeader) {
+ for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+ sb.append(rsmd.getColumnName(colPos)).append(" ");
+ }
+ sb.append('\n');
}
- sb.append('\n');
while(rs.next()) {
for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
sb.append(rs.getObject(colPos)).append(" ");
http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 5b6f20c..7b0369d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -85,6 +85,32 @@ public class DbTxnManager extends HiveTxnManagerImpl {
private Runnable shutdownRunner = null;
private static final int SHUTDOWN_HOOK_PRIORITY = 0;
+ // SynchronizedMetaStoreClient object per heartbeater thread.
+ private static ThreadLocal<SynchronizedMetaStoreClient> threadLocalMSClient =
+ new ThreadLocal<SynchronizedMetaStoreClient>() {
+
+ @Override
+ protected SynchronizedMetaStoreClient initialValue() {
+ return null;
+ }
+
+ @Override
+ public synchronized void remove() {
+ SynchronizedMetaStoreClient client = this.get();
+ if (client != null) {
+ client.close();
+ }
+ super.remove();
+ }
+ };
+
+ private static AtomicInteger heartbeaterMSClientCount = new AtomicInteger(0);
+ private int heartbeaterThreadPoolSize = 0;
+
+ private static SynchronizedMetaStoreClient getThreadLocalMSClient() {
+ return threadLocalMSClient.get();
+ }
+
DbTxnManager() {
shutdownRunner = new Runnable() {
@Override
@@ -324,7 +350,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
return t;
}
/**
- * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)}
* @param delay time to delay for first heartbeat
*/
@VisibleForTesting
@@ -418,7 +443,29 @@ public class DbTxnManager extends HiveTxnManagerImpl {
for (HiveLock lock : locks) {
long lockId = ((DbLockManager.DbHiveLock)lock).lockId;
try {
- client.heartbeat(txnId, lockId);
+ // Get the threadlocal metastore client for the heartbeat calls.
+ SynchronizedMetaStoreClient heartbeaterClient = getThreadLocalMSClient();
+ if (heartbeaterClient == null) {
+ Hive db;
+ try {
+ db = Hive.get(conf);
+ // Create a new threadlocal synchronized metastore client for use in hearbeater threads.
+ // This makes the concurrent use of heartbeat thread safe, and won't cause transaction
+ // abort due to a long metastore client call blocking the heartbeat call.
+ heartbeaterClient = new SynchronizedMetaStoreClient(db.getMSC());
+ threadLocalMSClient.set(heartbeaterClient);
+ } catch (HiveException e) {
+ LOG.error("Unable to create new metastore client for heartbeating", e);
+ throw new LockException(e);
+ }
+ // Increment the threadlocal metastore client count
+ if (heartbeaterMSClientCount.incrementAndGet() >= heartbeaterThreadPoolSize) {
+ LOG.warn("The number of hearbeater metastore clients - + "
+ + heartbeaterMSClientCount.get() + ", has exceeded the max limit - "
+ + heartbeaterThreadPoolSize);
+ }
+ }
+ heartbeaterClient.heartbeat(txnId, lockId);
} catch (NoSuchLockException e) {
LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId));
throw new LockException(e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
@@ -445,7 +492,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
private Heartbeater startHeartbeat(long initialDelay) throws LockException {
long heartbeatInterval = getHeartbeatInterval(conf);
assert heartbeatInterval > 0;
- Heartbeater heartbeater = new Heartbeater(this, conf);
+ Heartbeater heartbeater = new Heartbeater(this, conf, queryId);
// For negative testing purpose..
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
initialDelay = 0;
@@ -552,24 +599,42 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
private synchronized void initHeartbeatExecutorService() {
- if (heartbeatExecutorService != null
- && !heartbeatExecutorService.isShutdown()
+ if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()
&& !heartbeatExecutorService.isTerminated()) {
return;
}
-
- int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
+ heartbeaterThreadPoolSize =
+ conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE);
heartbeatExecutorService =
- Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() {
+ Executors.newScheduledThreadPool(heartbeaterThreadPoolSize, new ThreadFactory() {
private final AtomicInteger threadCounter = new AtomicInteger();
+
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement());
+ return new HeartbeaterThread(r, "Heartbeater-" + threadCounter.getAndIncrement());
}
});
((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true);
}
+ public static class HeartbeaterThread extends Thread {
+ public HeartbeaterThread(Runnable target, String name) {
+ super(target, name);
+ }
+
+ @Override
+ /**
+ * We're overriding finalize so that we can do an orderly cleanup of resources held by
+ * the threadlocal metastore client.
+ */
+ protected void finalize() throws Throwable {
+ threadLocalMSClient.remove();
+ // Adjust the metastore client count
+ DbTxnManager.heartbeaterMSClientCount.decrementAndGet();
+ super.finalize();
+ }
+ }
+
@Override
public boolean isTxnOpen() {
return txnId > 0;
@@ -601,20 +666,21 @@ public class DbTxnManager extends HiveTxnManagerImpl {
public static class Heartbeater implements Runnable {
private HiveTxnManager txnMgr;
private HiveConf conf;
-
LockException lockException;
+ private final String queryId;
+
public LockException getLockException() {
return lockException;
}
-
/**
*
* @param txnMgr transaction manager for this operation
*/
- Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
+ Heartbeater(HiveTxnManager txnMgr, HiveConf conf, String queryId) {
this.txnMgr = txnMgr;
this.conf = conf;
lockException = null;
+ this.queryId = queryId;
}
/**
@@ -627,12 +693,16 @@ public class DbTxnManager extends HiveTxnManagerImpl {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
}
-
LOG.debug("Heartbeating...");
txnMgr.heartbeat();
} catch (LockException e) {
- LOG.error("Failed trying to heartbeat " + e.getMessage());
+ LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + e.getMessage());
lockException = e;
+ } catch (Throwable t) {
+ LOG.error("Failed trying to heartbeat queryId=" + queryId + ": " + t.getMessage(), t);
+ lockException =
+ new LockException("Failed trying to heartbeat queryId=" + queryId + ": "
+ + t.getMessage(), t);
}
}
}
@@ -681,5 +751,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException {
return client.showLocks(showLocksRequest);
}
+
+ synchronized void close() {
+ client.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c2e9c59b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 388b7c5..d995e85 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -21,10 +21,13 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -428,26 +431,69 @@ public class TestTxnCommands {
Assert.assertTrue("Actual: " + cpr.getErrorMessage(), cpr.getErrorMessage().contains("Transaction manager has aborted the transaction txnid:1"));
//now test that we don't timeout locks we should not
- hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 10, TimeUnit.MINUTES);
+ //heartbeater should be running in the background every 1/2 second
+ hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
+ //hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true);
runStatementOnDriver("start transaction");
runStatementOnDriver("select count(*) from " + Table.ACIDTBL + " where a = 17");
+ pause(750);
+
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+ //since there is txn open, we are heartbeating the txn not individual locks
+ GetOpenTxnsInfoResponse txnsInfoResponse = txnHandler.getOpenTxnsInfo();
+ Assert.assertEquals(2, txnsInfoResponse.getOpen_txns().size());
+ TxnInfo txnInfo = null;
+ for(TxnInfo ti : txnsInfoResponse.getOpen_txns()) {
+ if(ti.getState() == TxnState.OPEN) {
+ txnInfo = ti;
+ break;
+ }
+ }
+ Assert.assertNotNull(txnInfo);
+ Assert.assertEquals(2, txnInfo.getId());
+ Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
+ String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
+ String[] vals = s.split("\\s+");
+ Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
+ long lastHeartbeat = Long.parseLong(vals[1]);
+ //these 2 values are equal when TXN entry is made. Should never be equal after 1st heartbeat, which we
+ //expect to have happened by now since HIVE_TXN_TIMEOUT=1sec
+ Assert.assertNotEquals("Didn't see heartbeat happen", Long.parseLong(vals[0]), lastHeartbeat);
+
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
+ pause(750);
TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
+ pause(750);
slr = txnHandler.showLocks(new ShowLocksRequest());
+ Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
- Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size());
+ pause(750);
TestTxnCommands2.runHouseKeeperService(houseKeeperService, hiveConf);
slr = txnHandler.showLocks(new ShowLocksRequest());
+ Assert.assertEquals("Unexpected lock count: " + slr, 1, slr.getLocks().size());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", Table.ACIDTBL.name, null, slr.getLocks().get(0));
- Assert.assertEquals("Unexpected lock count", 1, slr.getLocks().size());
+
+ //should've done several heartbeats
+ s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
+ vals = s.split("\\s+");
+ Assert.assertEquals("Didn't get expected timestamps", 2, vals.length);
+ Assert.assertTrue("Heartbeat didn't progress: (old,new) (" + lastHeartbeat + "," + vals[1]+ ")",
+ lastHeartbeat < Long.parseLong(vals[1]));
runStatementOnDriver("rollback");
slr = txnHandler.showLocks(new ShowLocksRequest());
Assert.assertEquals("Unexpected lock count", 0, slr.getLocks().size());
}
+ private static void pause(int timeMillis) {
+ try {
+ Thread.sleep(timeMillis);
+ }
+ catch (InterruptedException e) {
+ }
+ }
/**
* takes raw data and turns it into a string as if from Driver.getResults()