You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/20 17:25:20 UTC
[2/2] hive git commit: HIVE-13249 : Hard upper bound on number of
open transactions (Wei Zheng, reviewed by Eugene Koifman)
HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb3636f3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb3636f3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb3636f3
Branch: refs/heads/branch-1
Commit: cb3636f3fe3e45744eed23a542de05f77a3dd356
Parents: 5fe252b
Author: Wei Zheng <we...@apache.org>
Authored: Fri May 20 10:25:07 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri May 20 10:25:07 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 77 +
.../hadoop/hive/metastore/txn/TxnStore.java | 6 +
.../metastore/txn/TestCompactionTxnHandler.java | 447 -----
.../hive/metastore/txn/TestTxnHandler.java | 1521 ------------------
.../hive/ql/txn/AcidOpenTxnsCounterService.java | 69 +
.../metastore/txn/TestCompactionTxnHandler.java | 447 +++++
.../hive/metastore/txn/TestTxnHandler.java | 1521 ++++++++++++++++++
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 41 +-
9 files changed, 2166 insertions(+), 1969 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4c6aa71..c63c2ca 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1485,6 +1485,12 @@ public class HiveConf extends Configuration {
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
+ HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
+ "current open transactions reach this limit, future open transaction requests will be \n" +
+ "rejected, until this number goes below the limit."),
+ HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s",
+ new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."),
+
HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000,
"Maximum number of transactions that can be fetched in one call to open_txns().\n" +
"This controls how many transactions streaming agents such as Flume or Storm open\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 4da5542..27fa820 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -169,6 +170,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ // Maximum number of open transactions that's allowed
+ private static volatile int maxOpenTxns = 0;
+ // Current number of open txns
+ private static volatile long numOpenTxns = 0;
+ // Whether number of open transactions reaches the threshold
+ private static volatile boolean tooManyOpenTxns = false;
+ // The AcidHouseKeeperService for counting open transactions
+ private static volatile HouseKeeperService openTxnsCounter = null;
+
/**
* Number of consecutive deadlocks we have seen
*/
@@ -234,6 +244,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TimeUnit.MILLISECONDS);
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
+ maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -383,7 +394,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return new ValidReadTxnList(exceptions, highWater);
}
+ private static void startHouseKeeperService(HiveConf conf, Class c){
+ try {
+ openTxnsCounter = (HouseKeeperService)c.newInstance();
+ openTxnsCounter.start(conf);
+ } catch (Exception ex) {
+ LOG.error("Failed to start {}" + openTxnsCounter.getClass() +
+ ". The system will not handle {} " + openTxnsCounter.getServiceDescription() +
+ ". Root Cause: ", ex);
+ }
+ }
+
public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+ if (openTxnsCounter == null) {
+ synchronized (TxnHandler.class) {
+ try {
+ if (openTxnsCounter == null) {
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService"));
+ }
+ } catch (ClassNotFoundException e) {
+ throw new MetaException(e.getMessage());
+ }
+ }
+ }
+ if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) {
+ tooManyOpenTxns = true;
+ }
+ if (tooManyOpenTxns) {
+ if (numOpenTxns < maxOpenTxns * 0.9) {
+ tooManyOpenTxns = false;
+ } else {
+ LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
+ "reached. Current number of open transactions: " + numOpenTxns);
+ throw new MetaException("Maximum allowed number of open transactions has been reached. " +
+ "See hive.max.open.txns.");
+ }
+ }
+
int numTxns = rqst.getNum_txns();
try {
Connection dbConn = null;
@@ -2893,6 +2940,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ public void countOpenTxns() throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.error("Transaction database not properly configured, " +
+ "can't find txn_state from TXNS.");
+ } else {
+ numOpenTxns = rs.getLong(1);
+ }
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ LOG.info("Failed to update number of open transactions");
+ checkRetryable(dbConn, e, "countOpenTxns()");
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ countOpenTxns();
+ }
+ }
+
private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
if (connPool != null) return;
http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index dc807df..d739929 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -105,6 +105,12 @@ public interface TxnStore {
public GetOpenTxnsResponse getOpenTxns() throws MetaException;
/**
+ * Get the count for open transactions.
+ * @throws MetaException
+ */
+ public void countOpenTxns() throws MetaException;
+
+ /**
* Open a set of transactions
* @param rqst request to open transactions
* @return information on opened transactions
http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
deleted file mode 100644
index 23ad54e..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import static junit.framework.Assert.*;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestCompactionTxnHandler {
-
- private HiveConf conf = new HiveConf();
- private TxnStore txnHandler;
-
- public TestCompactionTxnHandler() throws Exception {
- TxnDbUtil.setConfValues(conf);
- LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG);
- tearDown();
- }
-
- @Test
- public void testFindNextToCompact() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- long now = System.currentTimeMillis();
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- assertEquals("ds=today", ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
- assertNull(ci.runAs);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- txnHandler.setRunAs(ci.id, "bob");
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("working", c.getState());
- assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
- assertEquals("fred", c.getWorkerid());
- assertEquals("bob", c.getRunAs());
- }
-
- @Test
- public void testFindNextToCompact2() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
-
- rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=yesterday");
- txnHandler.compact(rqst);
-
- long now = System.currentTimeMillis();
- boolean expectToday = false;
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- if ("ds=today".equals(ci.partName)) expectToday = false;
- else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
- else fail("partition name should have been today or yesterday but was " + ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
-
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- if (expectToday) assertEquals("ds=today", ci.partName);
- else assertEquals("ds=yesterday", ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
-
- assertNull(txnHandler.findNextToCompact("fred"));
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(2, compacts.size());
- for (ShowCompactResponseElement e : compacts) {
- assertEquals("working", e.getState());
- assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
- assertEquals("fred", e.getWorkerid());
- }
- }
-
- @Test
- public void testFindNextToCompactNothingToCompact() throws Exception {
- assertNull(txnHandler.findNextToCompact("fred"));
- }
-
- @Test
- public void testMarkCompacted() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
-
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("ready for cleaning", c.getState());
- assertNull(c.getWorkerid());
- }
-
- @Test
- public void testFindNextToClean() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean().size());
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact("fred"));
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("ready for cleaning", c.getState());
- assertNull(c.getWorkerid());
- }
-
- @Test
- public void testMarkCleaned() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean().size());
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact("fred"));
- txnHandler.markCleaned(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
- assertEquals(0, txnHandler.findReadyToClean().size());
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- assertEquals(1, rsp.getCompactsSize());
- assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
- }
-
- @Test
- public void testRevokeFromLocalWorkers() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
- txnHandler.compact(rqst);
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- assertNotNull(txnHandler.findNextToCompact("bob-193892"));
- assertNotNull(txnHandler.findNextToCompact("fred-193893"));
- txnHandler.revokeFromLocalWorkers("fred");
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(3, compacts.size());
- boolean sawWorkingBob = false;
- int initiatedCount = 0;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) {
- assertEquals("bob-193892", c.getWorkerid());
- sawWorkingBob = true;
- } else if (c.getState().equals("initiated")) {
- initiatedCount++;
- } else {
- fail("Unexpected state");
- }
- }
- assertTrue(sawWorkingBob);
- assertEquals(2, initiatedCount);
- }
-
- @Test
- public void testRevokeTimedOutWorkers() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
- txnHandler.compact(rqst);
-
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- Thread.sleep(200);
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- txnHandler.revokeTimedoutWorkers(100);
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(2, compacts.size());
- boolean sawWorking = false, sawInitiated = false;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) sawWorking = true;
- else if (c.getState().equals("initiated")) sawInitiated = true;
- else fail("Unexpected state");
- }
- assertTrue(sawWorking);
- assertTrue(sawInitiated);
- }
-
- @Test
- public void testFindPotentialCompactions() throws Exception {
- // Test that committing unlocks
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("yourtable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.commitTxn(new CommitTxnRequest(txnid));
- assertEquals(0, txnHandler.numLocksInLockTable());
-
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
- assertEquals(2, potentials.size());
- boolean sawMyTable = false, sawYourTable = false;
- for (CompactionInfo ci : potentials) {
- sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
- ci.partName == null);
- sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
- ci.partName.equals("mypartition"));
- }
- assertTrue(sawMyTable);
- assertTrue(sawYourTable);
- }
-
- // TODO test changes to mark cleaned to clean txns and txn_components
-
- @Test
- public void testMarkCleanedCleansTxnsAndTxnComponents()
- throws Exception {
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.INSERT);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- txnid = openTxn();
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("yourtable");
- comp.setOperationType(DataOperationType.DELETE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- txnid = openTxn();
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("foo");
- comp.setPartitionname("bar");
- comp.setOperationType(DataOperationType.UPDATE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("foo");
- comp.setPartitionname("baz");
- comp.setOperationType(DataOperationType.UPDATE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- CompactionInfo ci = new CompactionInfo();
-
- // Now clean them and check that they are removed from the count.
- CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- txnHandler.markCompacted(ci);
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
-
- // Check that we are cleaning up the empty aborted transactions
- GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
- assertEquals(3, txnList.getOpen_txnsSize());
- txnHandler.cleanEmptyAbortedTxns();
- txnList = txnHandler.getOpenTxns();
- assertEquals(2, txnList.getOpen_txnsSize());
-
- rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
- rqst.setPartitionname("bar");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- txnHandler.markCompacted(ci);
-
- toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
-
- txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
- txnHandler.cleanEmptyAbortedTxns();
- txnList = txnHandler.getOpenTxns();
- assertEquals(3, txnList.getOpen_txnsSize());
- }
-
- @Test
- public void addDynamicPartitions() throws Exception {
- String dbName = "default";
- String tableName = "adp_table";
- OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
- long txnId = openTxns.getTxn_ids().get(0);
- // lock a table, as in dynamic partitions
- LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
- lc.setTablename(tableName);
- DataOperationType dop = DataOperationType.UPDATE;
- lc.setOperationType(dop);
- LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
- lr.setTxnid(txnId);
- LockResponse lock = txnHandler.lock(lr);
- assertEquals(LockState.ACQUIRED, lock.getState());
-
- AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
- Arrays.asList("ds=yesterday", "ds=today"));
- adp.setOperationType(dop);
- txnHandler.addDynamicPartitions(adp);
- txnHandler.commitTxn(new CommitTxnRequest(txnId));
-
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
- assertEquals(2, potentials.size());
- SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
-
- int i = 0;
- for (CompactionInfo ci : sorted) {
- assertEquals(dbName, ci.dbname);
- assertEquals(tableName, ci.tableName);
- switch (i++) {
- case 0: assertEquals("ds=today", ci.partName); break;
- case 1: assertEquals("ds=yesterday", ci.partName); break;
- default: throw new RuntimeException("What?");
- }
- }
- }
-
- @Before
- public void setUp() throws Exception {
- TxnDbUtil.prepDb();
- txnHandler = TxnUtils.getTxnStore(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- TxnDbUtil.cleanDb();
- }
-
- private long openTxn() throws MetaException {
- List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
- return txns.get(0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/cb3636f3/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
deleted file mode 100644
index 0d4fc59..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ /dev/null
@@ -1,1521 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.junit.*;
-import org.apache.hadoop.util.StringUtils;
-
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestTxnHandler {
- static final private String CLASS_NAME = TxnHandler.class.getName();
- static final private Log LOG = LogFactory.getLog(CLASS_NAME);
-
- private HiveConf conf = new HiveConf();
- private TxnStore txnHandler;
-
- public TestTxnHandler() throws Exception {
- TxnDbUtil.setConfValues(conf);
- LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG);
- tearDown();
- }
-
- @Test
- public void testValidTxnsEmpty() throws Exception {
- GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
- assertEquals(0L, txnsInfo.getTxn_high_water_mark());
- assertTrue(txnsInfo.getOpen_txns().isEmpty());
- GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
- assertEquals(0L, txns.getTxn_high_water_mark());
- assertTrue(txns.getOpen_txns().isEmpty());
- }
-
- @Test
- public void testOpenTxn() throws Exception {
- long first = openTxn();
- assertEquals(1L, first);
- long second = openTxn();
- assertEquals(2L, second);
- GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
- assertEquals(2L, txnsInfo.getTxn_high_water_mark());
- assertEquals(2, txnsInfo.getOpen_txns().size());
- assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
- assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState());
- assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
- assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
- assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser());
- assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname());
-
- GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
- assertEquals(2L, txns.getTxn_high_water_mark());
- assertEquals(2, txns.getOpen_txns().size());
- boolean[] saw = new boolean[3];
- for (int i = 0; i < saw.length; i++) saw[i] = false;
- for (Long tid : txns.getOpen_txns()) {
- saw[tid.intValue()] = true;
- }
- for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
- }
-
- @Test
- public void testAbortTxn() throws Exception {
- OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
- List<Long> txnList = openedTxns.getTxn_ids();
- long first = txnList.get(0);
- assertEquals(1L, first);
- long second = txnList.get(1);
- assertEquals(2L, second);
- txnHandler.abortTxn(new AbortTxnRequest(1));
- GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
- assertEquals(2L, txnsInfo.getTxn_high_water_mark());
- assertEquals(2, txnsInfo.getOpen_txns().size());
- assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
- assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
- assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
- assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
-
- GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
- assertEquals(2L, txns.getTxn_high_water_mark());
- assertEquals(2, txns.getOpen_txns().size());
- boolean[] saw = new boolean[3];
- for (int i = 0; i < saw.length; i++) saw[i] = false;
- for (Long tid : txns.getOpen_txns()) {
- saw[tid.intValue()] = true;
- }
- for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
- }
-
- @Test
- public void testAbortInvalidTxn() throws Exception {
- boolean caught = false;
- try {
- txnHandler.abortTxn(new AbortTxnRequest(195L));
- } catch (NoSuchTxnException e) {
- caught = true;
- }
- assertTrue(caught);
- }
-
- @Test
- public void testValidTxnsNoneOpen() throws Exception {
- txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
- txnHandler.commitTxn(new CommitTxnRequest(1));
- txnHandler.commitTxn(new CommitTxnRequest(2));
- GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
- assertEquals(2L, txnsInfo.getTxn_high_water_mark());
- assertEquals(0, txnsInfo.getOpen_txns().size());
- GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
- assertEquals(2L, txns.getTxn_high_water_mark());
- assertEquals(0, txns.getOpen_txns().size());
- }
-
- @Test
- public void testValidTxnsSomeOpen() throws Exception {
- txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
- txnHandler.abortTxn(new AbortTxnRequest(1));
- txnHandler.commitTxn(new CommitTxnRequest(2));
- GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
- assertEquals(3L, txnsInfo.getTxn_high_water_mark());
- assertEquals(2, txnsInfo.getOpen_txns().size());
- assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
- assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
- assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId());
- assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
-
- GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
- assertEquals(3L, txns.getTxn_high_water_mark());
- assertEquals(2, txns.getOpen_txns().size());
- boolean[] saw = new boolean[4];
- for (int i = 0; i < saw.length; i++) saw[i] = false;
- for (Long tid : txns.getOpen_txns()) {
- saw[tid.intValue()] = true;
- }
- assertTrue(saw[1]);
- assertFalse(saw[2]);
- assertTrue(saw[3]);
- }
-
- @Test
- public void testLockDifferentDBs() throws Exception {
- // Test that two different databases don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockSameDB() throws Exception {
- // Test that two different databases don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockDbLocksTable() throws Exception {
- // Test that locking a database prevents locking of tables in the database
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- comp.setTablename("mytable");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockDbDoesNotLockTableInDifferentDB() throws Exception {
- // Test that locking a database prevents locking of tables in the database
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
- comp.setOperationType(DataOperationType.NO_TXN);
- comp.setTablename("mytable");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockDifferentTables() throws Exception {
- // Test that two different tables don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- comp.setTablename("mytable");
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- comp.setTablename("yourtable");
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockSameTable() throws Exception {
- // Test that two different tables don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockTableLocksPartition() throws Exception {
- // Test that locking a table prevents locking of partitions of the table
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockDifferentTableDoesntLockPartition() throws Exception {
- // Test that locking a table prevents locking of partitions of the table
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("yourtable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockDifferentPartitions() throws Exception {
- // Test that two different partitions don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("yourpartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockSamePartition() throws Exception {
- // Test that two different partitions don't collide on their locks
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockSRSR() throws Exception {
- // Test that two shared read locks can share a partition
- LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.INSERT);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockESRSR() throws Exception {
- // Test that exclusive lock blocks shared reads
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.INSERT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockSRSW() throws Exception {
- // Test that write can acquire after read
- LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.INSERT);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockESRSW() throws Exception {
- // Test that exclusive lock blocks read and write
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockSRE() throws Exception {
- // Test that read blocks exclusive
- LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockESRE() throws Exception {
- // Test that exclusive blocks read and exclusive
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockSWSR() throws Exception {
- // Test that read can acquire after write
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testLockSWSWSR() throws Exception {
- // Test that write blocks write but read can still acquire
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.INSERT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testWrongLockForOperation() throws Exception {
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- Exception expectedError = null;
- try {
- LockResponse res = txnHandler.lock(req);
- }
- catch(Exception e) {
- expectedError = e;
- }
- Assert.assertTrue(expectedError != null && expectedError.getMessage().contains("Unexpected DataOperationType"));
- }
- @Test
- public void testLockSWSWSW() throws Exception {
- // Test that write blocks two writes
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockEESW() throws Exception {
- // Test that exclusive blocks exclusive and write
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testLockEESR() throws Exception {
- // Test that exclusive blocks exclusive and read
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
-
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.SELECT);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.WAITING);
- }
-
- @Test
- public void testCheckLockAcquireAfterWaiting() throws Exception {
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- long txnId = openTxn();
- req.setTxnid(txnId);
- LockResponse res = txnHandler.lock(req);
- long lockid1 = res.getLockid();
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- components.clear();
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(openTxn());
- res = txnHandler.lock(req);
- long lockid2 = res.getLockid();
- assertTrue(res.getState() == LockState.WAITING);
-
- txnHandler.abortTxn(new AbortTxnRequest(txnId));
- res = txnHandler.checkLock(new CheckLockRequest(lockid2));
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testCheckLockNoSuchLock() throws Exception {
- try {
- txnHandler.checkLock(new CheckLockRequest(23L));
- fail("Allowed to check lock on non-existent lock");
- } catch (NoSuchLockException e) {
- }
- }
-
- @Test
- public void testCheckLockTxnAborted() throws Exception {
- // Test that when a transaction is aborted, the heartbeat fails
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- long lockid = res.getLockid();
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
- try {
- // This will throw NoSuchLockException (even though it's the
- // transaction we've closed) because that will have deleted the lock.
- txnHandler.checkLock(new CheckLockRequest(lockid));
- fail("Allowed to check lock on aborted transaction.");
- } catch (NoSuchLockException e) {
- }
- }
-
- @Test
- public void testMultipleLock() throws Exception {
- // Test more than one lock can be handled in a lock request
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(2);
- components.add(comp);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("anotherpartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- long lockid = res.getLockid();
- assertTrue(res.getState() == LockState.ACQUIRED);
- res = txnHandler.checkLock(new CheckLockRequest(lockid));
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.unlock(new UnlockRequest(lockid));
- assertEquals(0, txnHandler.numLocksInLockTable());
- }
-
- @Test
- public void testMultipleLockWait() throws Exception {
- // Test that two shared read locks can share a partition
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(2);
- components.add(comp);
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("anotherpartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- long lockid1 = res.getLockid();
- assertTrue(res.getState() == LockState.ACQUIRED);
-
-
- comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- res = txnHandler.lock(req);
- long lockid2 = res.getLockid();
- assertTrue(res.getState() == LockState.WAITING);
-
- txnHandler.unlock(new UnlockRequest(lockid1));
-
- res = txnHandler.checkLock(new CheckLockRequest(lockid2));
- assertTrue(res.getState() == LockState.ACQUIRED);
- }
-
- @Test
- public void testUnlockOnCommit() throws Exception {
- // Test that committing unlocks
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.DELETE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.commitTxn(new CommitTxnRequest(txnid));
- assertEquals(0, txnHandler.numLocksInLockTable());
- }
-
- @Test
- public void testUnlockOnAbort() throws Exception {
- // Test that committing unlocks
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
- assertEquals(0, txnHandler.numLocksInLockTable());
- }
-
- @Test
- public void testUnlockWithTxn() throws Exception {
- LOG.debug("Starting testUnlockWithTxn");
- // Test that attempting to unlock locks associated with a transaction
- // generates an error
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.DELETE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- long lockid = res.getLockid();
- try {
- txnHandler.unlock(new UnlockRequest(lockid));
- fail("Allowed to unlock lock associated with transaction.");
- } catch (TxnOpenException e) {
- }
- }
-
- @Test
- public void testHeartbeatTxnAborted() throws Exception {
- // Test that when a transaction is aborted, the heartbeat fails
- openTxn();
- txnHandler.abortTxn(new AbortTxnRequest(1));
- HeartbeatRequest h = new HeartbeatRequest();
- h.setTxnid(1);
- try {
- txnHandler.heartbeat(h);
- fail("Told there was a txn, when it should have been aborted.");
- } catch (TxnAbortedException e) {
- }
- }
-
- @Test
- public void testHeartbeatNoTxn() throws Exception {
- // Test that when a transaction is aborted, the heartbeat fails
- HeartbeatRequest h = new HeartbeatRequest();
- h.setTxnid(939393L);
- try {
- txnHandler.heartbeat(h);
- fail("Told there was a txn, when there wasn't.");
- } catch (NoSuchTxnException e) {
- }
- }
-
- @Test
- public void testHeartbeatLock() throws Exception {
- conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.SECONDS);
- HeartbeatRequest h = new HeartbeatRequest();
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- h.setLockid(res.getLockid());
- for (int i = 0; i < 30; i++) {
- try {
- txnHandler.heartbeat(h);
- } catch (NoSuchLockException e) {
- fail("Told there was no lock, when the heartbeat should have kept it.");
- }
- }
- }
-
- @Test
- public void heartbeatTxnRange() throws Exception {
- long txnid = openTxn();
- assertEquals(1, txnid);
- txnid = openTxn();
- txnid = openTxn();
- HeartbeatTxnRangeResponse rsp =
- txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
- assertEquals(0, rsp.getAborted().size());
- assertEquals(0, rsp.getNosuch().size());
- }
-
- @Test
- public void heartbeatTxnRangeOneCommitted() throws Exception {
- long txnid = openTxn();
- assertEquals(1, txnid);
- txnHandler.commitTxn(new CommitTxnRequest(1));
- txnid = openTxn();
- txnid = openTxn();
- HeartbeatTxnRangeResponse rsp =
- txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
- assertEquals(1, rsp.getNosuchSize());
- Long txn = rsp.getNosuch().iterator().next();
- assertEquals(1L, (long)txn);
- assertEquals(0, rsp.getAborted().size());
- }
-
- @Test
- public void heartbeatTxnRangeOneAborted() throws Exception {
- long txnid = openTxn();
- assertEquals(1, txnid);
- txnid = openTxn();
- txnid = openTxn();
- txnHandler.abortTxn(new AbortTxnRequest(3));
- HeartbeatTxnRangeResponse rsp =
- txnHandler.heartbeatTxnRange(new HeartbeatTxnRangeRequest(1, 3));
- assertEquals(1, rsp.getAbortedSize());
- Long txn = rsp.getAborted().iterator().next();
- assertEquals(3L, (long)txn);
- assertEquals(0, rsp.getNosuch().size());
- }
-
- @Test
- public void testLockTimeout() throws Exception {
- long timeout = txnHandler.setTimeout(1);
- try {
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setTablename("mytable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- Thread.sleep(10);
- txnHandler.performTimeOuts();
- txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
- fail("Told there was a lock, when it should have timed out.");
- } catch (NoSuchLockException e) {
- } finally {
- txnHandler.setTimeout(timeout);
- }
- }
-
- @Test
- public void testRecoverManyTimeouts() throws Exception {
- long timeout = txnHandler.setTimeout(1);
- try {
- txnHandler.openTxns(new OpenTxnRequest(503, "me", "localhost"));
- Thread.sleep(10);
- txnHandler.performTimeOuts();
- GetOpenTxnsInfoResponse rsp = txnHandler.getOpenTxnsInfo();
- int numAborted = 0;
- for (TxnInfo txnInfo : rsp.getOpen_txns()) {
- assertEquals(TxnState.ABORTED, txnInfo.getState());
- numAborted++;
- }
- assertEquals(503, numAborted);
- } finally {
- txnHandler.setTimeout(timeout);
- }
-
-
- }
-
- @Test
- public void testHeartbeatNoLock() throws Exception {
- HeartbeatRequest h = new HeartbeatRequest();
- h.setLockid(29389839L);
- try {
- txnHandler.heartbeat(h);
- fail("Told there was a lock, when there wasn't.");
- } catch (NoSuchLockException e) {
- }
- }
-
- @Test
- public void testCompactMajorWithPartition() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MAJOR, c.getType());
- assertEquals("initiated", c.getState());
- assertEquals(0L, c.getStart());
- }
-
- @Test
- public void testCompactMinorNoPartition() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setRunas("fred");
- txnHandler.compact(rqst);
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertNull(c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("initiated", c.getState());
- assertEquals(0L, c.getStart());
- assertEquals("fred", c.getRunAs());
- }
-
- @Test
- public void showLocks() throws Exception {
- long begining = System.currentTimeMillis();
- LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
- comp.setOperationType(DataOperationType.NO_TXN);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- LockResponse res = txnHandler.lock(req);
-
- // Open txn
- long txnid = openTxn();
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.SELECT);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
-
- // Locks not associated with a txn
- components = new ArrayList<LockComponent>(1);
- comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
- comp.setTablename("yourtable");
- comp.setPartitionname("yourpartition");
- comp.setOperationType(DataOperationType.INSERT);
- components.add(comp);
- req = new LockRequest(components, "you", "remotehost");
- res = txnHandler.lock(req);
-
- ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest());
- List<ShowLocksResponseElement> locks = rsp.getLocks();
- assertEquals(3, locks.size());
- boolean[] saw = new boolean[locks.size()];
- for (int i = 0; i < saw.length; i++) saw[i] = false;
- for (ShowLocksResponseElement lock : locks) {
- if (lock.getLockid() == 1) {
- assertEquals(0, lock.getTxnid());
- assertEquals("mydb", lock.getDbname());
- assertNull(lock.getTablename());
- assertNull(lock.getPartname());
- assertEquals(LockState.ACQUIRED, lock.getState());
- assertEquals(LockType.EXCLUSIVE, lock.getType());
- assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
- assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
- + " and " + System.currentTimeMillis(),
- begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
- assertEquals("me", lock.getUser());
- assertEquals("localhost", lock.getHostname());
- saw[0] = true;
- } else if (lock.getLockid() == 2) {
- assertEquals(1, lock.getTxnid());
- assertEquals("mydb", lock.getDbname());
- assertEquals("mytable", lock.getTablename());
- assertNull(lock.getPartname());
- assertEquals(LockState.WAITING, lock.getState());
- assertEquals(LockType.SHARED_READ, lock.getType());
- assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
- lock.getTxnid() != 0);
- assertEquals(0, lock.getAcquiredat());
- assertEquals("me", lock.getUser());
- assertEquals("localhost", lock.getHostname());
- saw[1] = true;
- } else if (lock.getLockid() == 3) {
- assertEquals(0, lock.getTxnid());
- assertEquals("yourdb", lock.getDbname());
- assertEquals("yourtable", lock.getTablename());
- assertEquals("yourpartition", lock.getPartname());
- assertEquals(LockState.ACQUIRED, lock.getState());
- assertEquals(LockType.SHARED_READ, lock.getType());
- assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
- System.currentTimeMillis() >= lock.getLastheartbeat());
- assertTrue(begining <= lock.getAcquiredat() &&
- System.currentTimeMillis() >= lock.getAcquiredat());
- assertEquals("you", lock.getUser());
- assertEquals("remotehost", lock.getHostname());
- saw[2] = true;
- } else {
- fail("Unknown lock id");
- }
- }
- for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
- }
-
- @Test
- @Ignore("Wedges Derby")
- public void deadlockDetected() throws Exception {
- LOG.debug("Starting deadlock test");
- if (txnHandler instanceof TxnHandler) {
- final TxnHandler tHndlr = (TxnHandler)txnHandler;
- Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- Statement stmt = conn.createStatement();
- long now = tHndlr.getDbTime(conn);
- stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, " +
- "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 'shagy', " +
- "'scooby.com')");
- stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, " +
- "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, " +
- "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 'mypartition', '" +
- tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now + ", 'fred', " +
- "'scooby.com')");
- conn.commit();
- tHndlr.closeDbConn(conn);
-
- final AtomicBoolean sawDeadlock = new AtomicBoolean();
-
- final Connection conn1 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- final Connection conn2 = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
- try {
-
- for (int i = 0; i < 5; i++) {
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
- try {
- updateTxns(conn1);
- updateLocks(conn1);
- Thread.sleep(1000);
- conn1.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
- try {
- tHndlr.checkRetryable(conn1, e, "thread t1");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
- }
- }
- conn1.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
-
- Thread t2 = new Thread() {
- @Override
- public void run() {
- try {
- try {
- updateLocks(conn2);
- updateTxns(conn2);
- Thread.sleep(1000);
- conn2.commit();
- LOG.debug("no exception, no deadlock");
- } catch (SQLException e) {
- try {
- tHndlr.checkRetryable(conn2, e, "thread t2");
- LOG.debug("Got an exception, but not a deadlock, SQLState is " +
- e.getSQLState() + " class of exception is " + e.getClass().getName() +
- " msg is <" + e.getMessage() + ">");
- } catch (TxnHandler.RetryException de) {
- LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " +
- "exception is " + e.getClass().getName() + " msg is <" + e
- .getMessage() + ">");
- sawDeadlock.set(true);
- }
- }
- conn2.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
-
- t1.start();
- t2.start();
- t1.join();
- t2.join();
- if (sawDeadlock.get()) break;
- }
- assertTrue(sawDeadlock.get());
- } finally {
- conn1.rollback();
- tHndlr.closeDbConn(conn1);
- conn2.rollback();
- tHndlr.closeDbConn(conn2);
- }
- }
- }
-
- /**
- * This cannnot be run against Derby (thus in UT) but it can run againt MySQL.
- * 1. add to metastore/pom.xml
- * <dependency>
- * <groupId>mysql</groupId>
- * <artifactId>mysql-connector-java</artifactId>
- * <version>5.1.30</version>
- * </dependency>
- * 2. Hack in the c'tor of this class
- * conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost/metastore");
- * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hive");
- * conf.setVar(HiveConf.ConfVars.METASTOREPWD, "hive");
- * conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
- * 3. Remove TxnDbUtil.prepDb(); in TxnHandler.checkQFileTestHack()
- *
- */
- @Ignore("multiple threads wedge Derby")
- @Test
- public void testMutexAPI() throws Exception {
- final TxnStore.MutexAPI api = txnHandler.getMutexAPI();
- final AtomicInteger stepTracker = new AtomicInteger(0);
- /**
- * counter = 0;
- * Thread1 counter=1, lock, wait 3s, check counter(should be 2), counter=3, unlock
- * Thread2 counter=2, lock (and block), inc counter, should be 4
- */
- Thread t1 = new Thread("MutexTest1") {
- public void run() {
- try {
- stepTracker.incrementAndGet();//now 1
- TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
- Thread.sleep(4000);
- //stepTracker should now be 2 which indicates t2 has started
- Assert.assertEquals("Thread2 should have started by now but not done work", 2, stepTracker.get());
- stepTracker.incrementAndGet();//now 3
- handle.releaseLocks();
- }
- catch(Exception ex) {
- throw new RuntimeException(ex.getMessage(), ex);
- }
- }
- };
- t1.setDaemon(true);
- ErrorHandle ueh1 = new ErrorHandle();
- t1.setUncaughtExceptionHandler(ueh1);
- Thread t2 = new Thread("MutexTest2") {
- public void run() {
- try {
- stepTracker.incrementAndGet();//now 2
- //this should block until t1 unlocks
- TxnStore.MutexAPI.LockHandle handle = api.acquireLock(TxnHandler.MUTEX_KEY.HouseKeeper.name());
- stepTracker.incrementAndGet();//now 4
- Assert.assertEquals(4, stepTracker.get());
- handle.releaseLocks();
- stepTracker.incrementAndGet();//now 5
- }
- catch(Exception ex) {
- throw new RuntimeException(ex.getMessage(), ex);
- }
- }
- };
- t2.setDaemon(true);
- ErrorHandle ueh2 = new ErrorHandle();
- t2.setUncaughtExceptionHandler(ueh2);
- t1.start();
- try {
- Thread.sleep(1000);
- }
- catch(InterruptedException ex) {
- LOG.info("Sleep was interrupted");
- }
- t2.start();
- t1.join(6000);//so that test doesn't block
- t2.join(6000);
-
- if(ueh1.error != null) {
- Assert.assertTrue("Unexpected error from t1: " + StringUtils.stringifyException(ueh1.error), false);
- }
- if (ueh2.error != null) {
- Assert.assertTrue("Unexpected error from t2: " + StringUtils.stringifyException(ueh2.error), false);
- }
- Assert.assertEquals("5 means both threads have completed", 5, stepTracker.get());
- }
- private final static class ErrorHandle implements Thread.UncaughtExceptionHandler {
- Throwable error = null;
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.error("Uncaught exception from " + t.getName() + ": " + e.getMessage());
- error = e;
- }
- }
-
- @Test
- public void testRetryableRegex() throws Exception {
- SQLException sqlException = new SQLException("ORA-08177: can't serialize access for this transaction", "72000");
- // Note that we have 3 regex'es below
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, "^Deadlock detected, roll back,.*08177.*,.*08178.*");
- boolean result = TxnHandler.isRetryable(conf, sqlException);
- Assert.assertTrue("regex should be retryable", result);
-
- sqlException = new SQLException("This error message, has comma in it");
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_RETRYABLE_SQLEX_REGEX, ".*comma.*");
- result = TxnHandler.isRetryable(conf, sqlException);
- Assert.assertTrue("regex should be retryable", result);
- }
-
- private void updateTxns(Connection conn) throws SQLException {
- Statement stmt = conn.createStatement();
- stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
- }
-
- private void updateLocks(Connection conn) throws SQLException {
- Statement stmt = conn.createStatement();
- stmt.executeUpdate("update HIVE_LOCKS set hl_last_heartbeat = hl_last_heartbeat + 1");
- }
-
- @Test
- public void testBuildQueryWithINClause() throws Exception {
- List<String> queries = new ArrayList<String>();
-
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
-
- // Note, this is a "real" query that depends on one of the metastore tables
- prefix.append("select count(*) from TXNS where ");
- suffix.append(" and TXN_STATE = 'o'");
-
- // Case 1 - Max in list members: 10; Max query string length: 1KB
- // The first query happens to have 2 full batches.
- conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1);
- conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10);
- List<Long> inList = new ArrayList<Long>();
- for (long i = 1; i <= 200; i++) {
- inList.add(i);
- }
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
- Assert.assertEquals(1, queries.size());
- runAgainstDerby(queries);
-
- // Case 2 - Max in list members: 10; Max query string length: 1KB
- // The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
- queries.clear();
- inList.add((long)201);
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
- Assert.assertEquals(2, queries.size());
- runAgainstDerby(queries);
-
- // Case 3 - Max in list members: 1000; Max query string length: 5KB
- conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 10);
- conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000);
- queries.clear();
- for (long i = 202; i <= 4321; i++) {
- inList.add(i);
- }
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
- Assert.assertEquals(3, queries.size());
- runAgainstDerby(queries);
-
- // Case 4 - NOT IN list
- queries.clear();
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
- Assert.assertEquals(3, queries.size());
- runAgainstDerby(queries);
-
- // Case 5 - No parenthesis
- queries.clear();
- suffix.setLength(0);
- suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
- Assert.assertEquals(3, queries.size());
- runAgainstDerby(queries);
- }
-
- /** Verify queries can run against Derby DB.
- * As long as Derby doesn't complain, we assume the query is syntactically/semantically correct.
- */
- private void runAgainstDerby(List<String> queries) throws Exception {
- Connection conn = null;
- Statement stmt = null;
- ResultSet rs = null;
-
- try {
- conn = TxnDbUtil.getConnection();
- stmt = conn.createStatement();
- for (String query : queries) {
- rs = stmt.executeQuery(query);
- Assert.assertTrue("The query is not valid", rs.next());
- }
- } finally {
- TxnDbUtil.closeResources(conn, stmt, rs);
- }
- }
-
- @Before
- public void setUp() throws Exception {
- TxnDbUtil.prepDb();
- txnHandler = TxnUtils.getTxnStore(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- TxnDbUtil.cleanDb();
- }
-
- private long openTxn() throws MetaException {
- List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
- return txns.get(0);
- }
-
-}