You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/20 16:51:00 UTC
[3/3] hive git commit: HIVE-13249 : Hard upper bound on number of
open transactions (Wei Zheng, reviewed by Eugene Koifman)
HIVE-13249 : Hard upper bound on number of open transactions (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/259e8be1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/259e8be1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/259e8be1
Branch: refs/heads/master
Commit: 259e8be1d4486c6a17b8c240e43154c5a839524e
Parents: 360dfa0
Author: Wei Zheng <we...@apache.org>
Authored: Fri May 20 09:50:44 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Fri May 20 09:50:44 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 79 +
.../hadoop/hive/metastore/txn/TxnStore.java | 6 +
.../metastore/txn/TestCompactionTxnHandler.java | 466 ------
.../hive/metastore/txn/TestTxnHandler.java | 1484 ------------------
.../hive/ql/txn/AcidOpenTxnsCounterService.java | 69 +
.../metastore/txn/TestCompactionTxnHandler.java | 466 ++++++
.../hive/metastore/txn/TestTxnHandler.java | 1484 ++++++++++++++++++
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 41 +-
9 files changed, 2150 insertions(+), 1951 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9cc8fbe..4cfa5f1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1681,6 +1681,12 @@ public class HiveConf extends Configuration {
" of the lock manager is dumped to log file. This is for debugging. See also " +
"hive.lock.numretries and hive.lock.sleep.between.retries."),
+ HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
+ "current open transactions reach this limit, future open transaction requests will be \n" +
+ "rejected, until this number goes below the limit."),
+ HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s",
+ new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open transactions."),
+
HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000,
"Maximum number of transactions that can be fetched in one call to open_txns().\n" +
"This controls how many transactions streaming agents such as Flume or Storm open\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index abaff34..82d685d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.HouseKeeperService;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,6 +168,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ // Maximum number of open transactions that's allowed
+ private static volatile int maxOpenTxns = 0;
+ // Current number of open txns
+ private static volatile long numOpenTxns = 0;
+ // Whether number of open transactions reaches the threshold
+ private static volatile boolean tooManyOpenTxns = false;
+ // The AcidHouseKeeperService for counting open transactions
+ private static volatile HouseKeeperService openTxnsCounter = null;
+
/**
* Number of consecutive deadlocks we have seen
*/
@@ -236,6 +246,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TimeUnit.MILLISECONDS);
retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS);
deadlockRetryInterval = retryInterval / 10;
+ maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS);
}
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
@@ -362,7 +373,45 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return getOpenTxns();
}
}
+
+ private static void startHouseKeeperService(HiveConf conf, Class c){
+ try {
+ openTxnsCounter = (HouseKeeperService)c.newInstance();
+ openTxnsCounter.start(conf);
+ } catch (Exception ex) {
+ LOG.error("Failed to start {}" , openTxnsCounter.getClass() +
+ ". The system will not handle {} " , openTxnsCounter.getServiceDescription(),
+ ". Root Cause: ", ex);
+ }
+ }
+
public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {
+ if (openTxnsCounter == null) {
+ synchronized (TxnHandler.class) {
+ try {
+ if (openTxnsCounter == null) {
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService"));
+ }
+ } catch (ClassNotFoundException e) {
+ throw new MetaException(e.getMessage());
+ }
+ }
+ }
+
+ if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) {
+ tooManyOpenTxns = true;
+ }
+ if (tooManyOpenTxns) {
+ if (numOpenTxns < maxOpenTxns * 0.9) {
+ tooManyOpenTxns = false;
+ } else {
+ LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " +
+ "reached. Current number of open transactions: " + numOpenTxns);
+ throw new MetaException("Maximum allowed number of open transactions has been reached. " +
+ "See hive.max.open.txns.");
+ }
+ }
+
int numTxns = rqst.getNum_txns();
try {
Connection dbConn = null;
@@ -2856,6 +2905,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ public void countOpenTxns() throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'";
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ LOG.error("Transaction database not properly configured, " +
+ "can't find txn_state from TXNS.");
+ } else {
+ numOpenTxns = rs.getLong(1);
+ }
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ LOG.info("Failed to update number of open transactions");
+ checkRetryable(dbConn, e, "countOpenTxns()");
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ } catch (RetryException e) {
+ countOpenTxns();
+ }
+ }
+
private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException {
if (connPool != null) return;
http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 12be862..5b56aaf 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -77,6 +77,12 @@ public interface TxnStore {
public GetOpenTxnsResponse getOpenTxns() throws MetaException;
/**
+ * Get the count for open transactions.
+ * @throws MetaException
+ */
+ public void countOpenTxns() throws MetaException;
+
+ /**
* Open a set of transactions
* @param rqst request to open transactions
* @return information on opened transactions
http://git-wip-us.apache.org/repos/asf/hive/blob/259e8be1/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
deleted file mode 100644
index f513d0f..0000000
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
-import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.LockComponent;
-import org.apache.hadoop.hive.metastore.api.LockLevel;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-
-/**
- * Tests for TxnHandler.
- */
-public class TestCompactionTxnHandler {
-
- private HiveConf conf = new HiveConf();
- private TxnStore txnHandler;
-
- public TestCompactionTxnHandler() throws Exception {
- TxnDbUtil.setConfValues(conf);
- tearDown();
- }
-
- @Test
- public void testFindNextToCompact() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- long now = System.currentTimeMillis();
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- assertEquals("ds=today", ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
- assertNull(ci.runAs);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- txnHandler.setRunAs(ci.id, "bob");
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("working", c.getState());
- assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
- assertEquals("fred", c.getWorkerid());
- assertEquals("bob", c.getRunAs());
- }
-
- @Test
- public void testFindNextToCompact2() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
-
- rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=yesterday");
- txnHandler.compact(rqst);
-
- long now = System.currentTimeMillis();
- boolean expectToday = false;
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- if ("ds=today".equals(ci.partName)) expectToday = false;
- else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
- else fail("partition name should have been today or yesterday but was " + ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
-
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- assertEquals("foo", ci.dbname);
- assertEquals("bar", ci.tableName);
- if (expectToday) assertEquals("ds=today", ci.partName);
- else assertEquals("ds=yesterday", ci.partName);
- assertEquals(CompactionType.MINOR, ci.type);
-
- assertNull(txnHandler.findNextToCompact("fred"));
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(2, compacts.size());
- for (ShowCompactResponseElement e : compacts) {
- assertEquals("working", e.getState());
- assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
- assertEquals("fred", e.getWorkerid());
- }
- }
-
- @Test
- public void testFindNextToCompactNothingToCompact() throws Exception {
- assertNull(txnHandler.findNextToCompact("fred"));
- }
-
- @Test
- public void testMarkCompacted() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
-
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("ready for cleaning", c.getState());
- assertNull(c.getWorkerid());
- }
-
- @Test
- public void testFindNextToClean() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean().size());
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact("fred"));
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(1, compacts.size());
- ShowCompactResponseElement c = compacts.get(0);
- assertEquals("foo", c.getDbname());
- assertEquals("bar", c.getTablename());
- assertEquals("ds=today", c.getPartitionname());
- assertEquals(CompactionType.MINOR, c.getType());
- assertEquals("ready for cleaning", c.getState());
- assertNull(c.getWorkerid());
- }
-
- @Test
- public void testMarkCleaned() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- rqst.setPartitionname("ds=today");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- CompactionInfo ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
-
- assertEquals(0, txnHandler.findReadyToClean().size());
- txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact("fred"));
- txnHandler.markCleaned(ci);
- assertNull(txnHandler.findNextToCompact("fred"));
- assertEquals(0, txnHandler.findReadyToClean().size());
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- assertEquals(1, rsp.getCompactsSize());
- assertTrue(TxnHandler.SUCCEEDED_RESPONSE.equals(rsp.getCompacts().get(0).getState()));
- }
-
- @Test
- public void testRevokeFromLocalWorkers() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
- txnHandler.compact(rqst);
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- assertNotNull(txnHandler.findNextToCompact("bob-193892"));
- assertNotNull(txnHandler.findNextToCompact("fred-193893"));
- txnHandler.revokeFromLocalWorkers("fred");
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(3, compacts.size());
- boolean sawWorkingBob = false;
- int initiatedCount = 0;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) {
- assertEquals("bob-193892", c.getWorkerid());
- sawWorkingBob = true;
- } else if (c.getState().equals("initiated")) {
- initiatedCount++;
- } else {
- fail("Unexpected state");
- }
- }
- assertTrue(sawWorkingBob);
- assertEquals(2, initiatedCount);
- }
-
- @Test
- public void testRevokeTimedOutWorkers() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
- txnHandler.compact(rqst);
-
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- Thread.sleep(200);
- assertNotNull(txnHandler.findNextToCompact("fred-193892"));
- txnHandler.revokeTimedoutWorkers(100);
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(2, compacts.size());
- boolean sawWorking = false, sawInitiated = false;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) sawWorking = true;
- else if (c.getState().equals("initiated")) sawInitiated = true;
- else fail("Unexpected state");
- }
- assertTrue(sawWorking);
- assertTrue(sawInitiated);
- }
-
- @Test
- public void testFindPotentialCompactions() throws Exception {
- // Test that committing unlocks
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("yourtable");
- comp.setPartitionname("mypartition");
- comp.setOperationType(DataOperationType.UPDATE);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.commitTxn(new CommitTxnRequest(txnid));
- assertEquals(0, txnHandler.numLocksInLockTable());
-
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
- assertEquals(2, potentials.size());
- boolean sawMyTable = false, sawYourTable = false;
- for (CompactionInfo ci : potentials) {
- sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
- ci.partName == null);
- sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
- ci.partName.equals("mypartition"));
- }
- assertTrue(sawMyTable);
- assertTrue(sawYourTable);
- }
-
- // TODO test changes to mark cleaned to clean txns and txn_components
-
- @Test
- public void testMarkCleanedCleansTxnsAndTxnComponents()
- throws Exception {
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
- "mydb");
- comp.setTablename("mytable");
- comp.setOperationType(DataOperationType.INSERT);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- txnid = openTxn();
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("yourtable");
- comp.setOperationType(DataOperationType.DELETE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- txnid = openTxn();
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("foo");
- comp.setPartitionname("bar");
- comp.setOperationType(DataOperationType.UPDATE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
-
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
- comp.setTablename("foo");
- comp.setPartitionname("baz");
- comp.setOperationType(DataOperationType.UPDATE);
- components = new ArrayList<LockComponent>(1);
- components.add(comp);
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- res = txnHandler.lock(req);
- assertTrue(res.getState() == LockState.ACQUIRED);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
- CompactionInfo ci = new CompactionInfo();
-
- // Now clean them and check that they are removed from the count.
- CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- txnHandler.markCompacted(ci);
-
- List<CompactionInfo> toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
-
- // Check that we are cleaning up the empty aborted transactions
- GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
- assertEquals(3, txnList.getOpen_txnsSize());
- txnHandler.cleanEmptyAbortedTxns();
- txnList = txnHandler.getOpenTxns();
- assertEquals(2, txnList.getOpen_txnsSize());
-
- rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
- rqst.setPartitionname("bar");
- txnHandler.compact(rqst);
- assertEquals(0, txnHandler.findReadyToClean().size());
- ci = txnHandler.findNextToCompact("fred");
- assertNotNull(ci);
- txnHandler.markCompacted(ci);
-
- toClean = txnHandler.findReadyToClean();
- assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
-
- txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
- txnHandler.cleanEmptyAbortedTxns();
- txnList = txnHandler.getOpenTxns();
- assertEquals(3, txnList.getOpen_txnsSize());
- }
-
- @Test
- public void addDynamicPartitions() throws Exception {
- String dbName = "default";
- String tableName = "adp_table";
- OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
- long txnId = openTxns.getTxn_ids().get(0);
- // lock a table, as in dynamic partitions
- LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
- lc.setTablename(tableName);
- DataOperationType dop = DataOperationType.UPDATE;
- lc.setOperationType(dop);
- LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
- lr.setTxnid(txnId);
- LockResponse lock = txnHandler.lock(lr);
- assertEquals(LockState.ACQUIRED, lock.getState());
-
- AddDynamicPartitions adp = new AddDynamicPartitions(txnId, dbName, tableName,
- Arrays.asList("ds=yesterday", "ds=today"));
- adp.setOperationType(dop);
- txnHandler.addDynamicPartitions(adp);
- txnHandler.commitTxn(new CommitTxnRequest(txnId));
-
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
- assertEquals(2, potentials.size());
- SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
-
- int i = 0;
- for (CompactionInfo ci : sorted) {
- assertEquals(dbName, ci.dbname);
- assertEquals(tableName, ci.tableName);
- switch (i++) {
- case 0: assertEquals("ds=today", ci.partName); break;
- case 1: assertEquals("ds=yesterday", ci.partName); break;
- default: throw new RuntimeException("What?");
- }
- }
- }
-
- @Before
- public void setUp() throws Exception {
- TxnDbUtil.prepDb();
- txnHandler = TxnUtils.getTxnStore(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- TxnDbUtil.cleanDb();
- }
-
- private long openTxn() throws MetaException {
- List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
- return txns.get(0);
- }
-
-}