You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/03/05 01:20:57 UTC
svn commit: r1574266 [21/23] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/ metastore/
metastore/if/ metastore/scripts/upgrade/derby/
metastore/scripts/upgrade/mysql/ meta...
Added: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestLockRequestBuilder.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestLockRequestBuilder.java?rev=1574266&view=auto
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestLockRequestBuilder.java (added)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestLockRequestBuilder.java Wed Mar 5 00:20:53 2014
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockLevel;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.List;
+
+/**
+ * Tests for LockRequestBuilder.
+ */
+public class TestLockRequestBuilder {
+
+ // Test failure if user not set
+ @Test
+ public void noUser() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ boolean caughtException = false;
+ try {
+ LockRequest req = bldr.build();
+ } catch (RuntimeException e) {
+ Assert.assertEquals("Cannot build a lock without giving a user", e.getMessage());
+ caughtException = true;
+ }
+ Assert.assertTrue(caughtException);
+ }
+
+ // Test that database and table don't coalesce.
+ @Test
+ public void testDbTable() throws Exception {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(2, locks.size());
+ Assert.assertEquals("fred", req.getUser());
+ Assert.assertEquals(InetAddress.getLocalHost().getHostName(), req.getHostname());
+ }
+
+ // Test that database and table don't coalesce.
+ @Test
+ public void testTablePartition() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser(null);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ bldr.addLockComponent(comp);
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(2, locks.size());
+ Assert.assertEquals("unknown", req.getUser());
+ }
+
+ // Test that 2 separate databases don't coalesce.
+ @Test
+ public void testTwoSeparateDbs() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
+ bldr.addLockComponent(comp);
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(2, locks.size());
+ }
+
+ // Test that 2 exclusive db locks coalesce to one
+ @Test
+ public void testExExDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ }
+
+ // Test that existing exclusive db with new shared_write coalesces to
+ // exclusive
+ @Test
+ public void testExSWDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing exclusive db with new shared_read coalesces to
+ // exclusive
+ @Test
+ public void testExSRDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write db with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSWExDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write db with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSWSWDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write db with new shared_read coalesces to
+ // shared_write
+ @Test
+ public void testSWSRDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read db with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSRExDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read db with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSRSWDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read db with new shared_read coalesces to
+ // shared_read
+ @Test
+ public void testSRSRDb() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_READ, locks.get(0).getType());
+ }
+
+ // Test that 2 separate tables don't coalesce.
+ @Test
+ public void testTwoSeparateTables() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("yourtable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(2, locks.size());
+ }
+
+ // Test that 2 exclusive table locks coalesce to one
+ @Test
+ public void testExExTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ }
+
+ // Test that existing exclusive table with new shared_write coalesces to
+ // exclusive
+ @Test
+ public void testExSWTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing exclusive table with new shared_read coalesces to
+ // exclusive
+ @Test
+ public void testExSRTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write table with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSWExTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write table with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSWSWTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write table with new shared_read coalesces to
+ // shared_write
+ @Test
+ public void testSWSRTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read table with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSRExTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read table with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSRSWTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read table with new shared_read coalesces to
+ // shared_read
+ @Test
+ public void testSRSRTable() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_READ, locks.get(0).getType());
+ }
+
+ // Test that 2 separate partitions don't coalesce.
+ @Test
+ public void testTwoSeparatePartitions() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("yourpart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(2, locks.size());
+ }
+
+ // Test that 2 exclusive partition locks coalesce to one
+ @Test
+ public void testExExPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ }
+
+ // Test that existing exclusive partition with new shared_write coalesces to
+ // exclusive
+ @Test
+ public void testExSWPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing exclusive partition with new shared_read coalesces to
+ // exclusive
+ @Test
+ public void testExSRPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write partition with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSWExPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write partition with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSWSWPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_write partition with new shared_read coalesces to
+ // shared_write
+ @Test
+ public void testSWSRPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read partition with new exclusive coalesces to
+ // exclusive
+ @Test
+ public void testSRExPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.EXCLUSIVE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read partition with new shared_write coalesces to
+ // shared_write
+ @Test
+ public void testSRSWPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_WRITE, locks.get(0).getType());
+ }
+
+ // Test that existing shared_read partition with new shared_read coalesces to
+ // shared_read
+ @Test
+ public void testSRSRPart() {
+ LockRequestBuilder bldr = new LockRequestBuilder();
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp);
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypart");
+ bldr.addLockComponent(comp).setUser("fred");
+ LockRequest req = bldr.build();
+ List<LockComponent> locks = req.getComponent();
+ Assert.assertEquals(1, locks.size());
+ Assert.assertEquals(LockType.SHARED_READ, locks.get(0).getType());
+ }
+}
Added: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java?rev=1574266&view=auto
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java (added)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java Wed Mar 5 00:20:53 2014
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import junit.framework.Assert;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Tests for TxnHandler.
+ */
+public class TestCompactionTxnHandler {
+
+ private HiveConf conf = new HiveConf();
+ private CompactionTxnHandler txnHandler;
+
+ public TestCompactionTxnHandler() throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG);
+ tearDown();
+ }
+
+ @Test
+ public void testFindNextToCompact() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ long now = System.currentTimeMillis();
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ assertEquals("foo", ci.dbname);
+ assertEquals("bar", ci.tableName);
+ assertEquals("ds=today", ci.partName);
+ assertEquals(CompactionType.MINOR, ci.type);
+ assertNull(ci.runAs);
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+ txnHandler.setRunAs(ci.id, "bob");
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(1, compacts.size());
+ ShowCompactResponseElement c = compacts.get(0);
+ assertEquals("foo", c.getDbname());
+ assertEquals("bar", c.getTablename());
+ assertEquals("ds=today", c.getPartitionname());
+ assertEquals(CompactionType.MINOR, c.getType());
+ assertEquals("working", c.getState());
+ assertTrue(c.getStart() - 5000 < now && c.getStart() + 5000 > now);
+ assertEquals("fred", c.getWorkerid());
+ assertEquals("bob", c.getRunAs());
+ }
+
+ @Test
+ public void testFindNextToCompact2() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+
+ rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=yesterday");
+ txnHandler.compact(rqst);
+
+ long now = System.currentTimeMillis();
+ boolean expectToday = false;
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ assertEquals("foo", ci.dbname);
+ assertEquals("bar", ci.tableName);
+ if ("ds=today".equals(ci.partName)) expectToday = false;
+ else if ("ds=yesterday".equals(ci.partName)) expectToday = true;
+ else fail("partition name should have been today or yesterday but was " + ci.partName);
+ assertEquals(CompactionType.MINOR, ci.type);
+
+ ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ assertEquals("foo", ci.dbname);
+ assertEquals("bar", ci.tableName);
+ if (expectToday) assertEquals("ds=today", ci.partName);
+ else assertEquals("ds=yesterday", ci.partName);
+ assertEquals(CompactionType.MINOR, ci.type);
+
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(2, compacts.size());
+ for (ShowCompactResponseElement e : compacts) {
+ assertEquals("working", e.getState());
+ assertTrue(e.getStart() - 5000 < now && e.getStart() + 5000 > now);
+ assertEquals("fred", e.getWorkerid());
+ }
+ }
+
+ @Test
+ public void testFindNextToCompactNothingToCompact() throws Exception {
+ assertNull(txnHandler.findNextToCompact("fred"));
+ }
+
+ @Test
+ public void testMarkCompacted() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+
+ txnHandler.markCompacted(ci);
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(1, compacts.size());
+ ShowCompactResponseElement c = compacts.get(0);
+ assertEquals("foo", c.getDbname());
+ assertEquals("bar", c.getTablename());
+ assertEquals("ds=today", c.getPartitionname());
+ assertEquals(CompactionType.MINOR, c.getType());
+ assertEquals("ready for cleaning", c.getState());
+ assertNull(c.getWorkerid());
+ }
+
+ @Test
+ public void testFindNextToClean() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ txnHandler.markCompacted(ci);
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+ List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ assertEquals(1, toClean.size());
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(1, compacts.size());
+ ShowCompactResponseElement c = compacts.get(0);
+ assertEquals("foo", c.getDbname());
+ assertEquals("bar", c.getTablename());
+ assertEquals("ds=today", c.getPartitionname());
+ assertEquals(CompactionType.MINOR, c.getType());
+ assertEquals("ready for cleaning", c.getState());
+ assertNull(c.getWorkerid());
+ }
+
+ @Test
+ public void testMarkCleaned() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ txnHandler.markCompacted(ci);
+ assertNull(txnHandler.findNextToCompact("fred"));
+
+ List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ assertEquals(1, toClean.size());
+ assertNull(txnHandler.findNextToCompact("fred"));
+ txnHandler.markCleaned(ci);
+ assertNull(txnHandler.findNextToCompact("fred"));
+ assertEquals(0, txnHandler.findReadyToClean().size());
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ assertNull(rsp.getCompacts());
+ }
+
+ @Test
+ public void testRevokeFromLocalWorkers() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+ assertNotNull(txnHandler.findNextToCompact("bob-193892"));
+ assertNotNull(txnHandler.findNextToCompact("fred-193893"));
+ txnHandler.revokeFromLocalWorkers("fred");
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(3, compacts.size());
+ boolean sawWorkingBob = false;
+ int initiatedCount = 0;
+ for (ShowCompactResponseElement c : compacts) {
+ if (c.getState().equals("working")) {
+ assertEquals("bob-193892", c.getWorkerid());
+ sawWorkingBob = true;
+ } else if (c.getState().equals("initiated")) {
+ initiatedCount++;
+ } else {
+ fail("Unexpected state");
+ }
+ }
+ assertTrue(sawWorkingBob);
+ assertEquals(2, initiatedCount);
+ }
+
+ @Test
+ public void testRevokeTimedOutWorkers() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+
+ assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+ Thread.sleep(200);
+ assertNotNull(txnHandler.findNextToCompact("fred-193892"));
+ txnHandler.revokeTimedoutWorkers(100);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(2, compacts.size());
+ boolean sawWorking = false, sawInitiated = false;
+ for (ShowCompactResponseElement c : compacts) {
+ if (c.getState().equals("working")) sawWorking = true;
+ else if (c.getState().equals("initiated")) sawInitiated = true;
+ else fail("Unexpected state");
+ }
+ assertTrue(sawWorking);
+ assertTrue(sawInitiated);
+ }
+
+ @Test
+ public void testLockNoWait() throws Exception {
+ // Test that we can acquire the lock alone
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
+ "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lockNoWait(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+
+ // test that another lock blocks it
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB,
+ "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertEquals(LockState.ACQUIRED, res.getState());
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB,
+ "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lockNoWait(req);
+ assertEquals(LockState.NOT_ACQUIRED, res.getState());
+ assertEquals(1, TxnDbUtil.findNumCurrentLocks());
+ }
+
+ @Test
+ public void testFindPotentialCompactions() throws Exception {
+ // Test that committing unlocks
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+ "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+ "mydb");
+ comp.setTablename("yourtable");
+ comp.setPartitionname("mypartition");
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ assertEquals(0, txnHandler.numLocksInLockTable());
+
+ List<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+ assertEquals(2, potentials.size());
+ boolean sawMyTable = false, sawYourTable = false;
+ for (CompactionInfo ci : potentials) {
+ sawMyTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("mytable") &&
+ ci.partName == null);
+ sawYourTable |= (ci.dbname.equals("mydb") && ci.tableName.equals("yourtable") &&
+ ci.partName.equals("mypartition"));
+ }
+ assertTrue(sawMyTable);
+ assertTrue(sawYourTable);
+ }
+
+ // TODO test changes to mark cleaned to clean txns and txn_components
+
+ @Test
+ public void testMarkCleanedCleansTxnsAndTxnComponents()
+ throws Exception {
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB,
+ "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ txnid = openTxn();
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("yourtable");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ txnid = openTxn();
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("foo");
+ comp.setPartitionname("bar");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("foo");
+ comp.setPartitionname("baz");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ CompactionInfo ci = new CompactionInfo();
+
+ // Now clean them and check that they are removed from the count.
+ CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ txnHandler.markCompacted(ci);
+
+ List<CompactionInfo> toClean = txnHandler.findReadyToClean();
+ assertEquals(1, toClean.size());
+ txnHandler.markCleaned(ci);
+
+ // Check that we are cleaning up the empty aborted transactions
+ GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
+ assertEquals(3, txnList.getOpen_txnsSize());
+ txnHandler.cleanEmptyAbortedTxns();
+ txnList = txnHandler.getOpenTxns();
+ assertEquals(2, txnList.getOpen_txnsSize());
+
+ rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
+ rqst.setPartitionname("bar");
+ txnHandler.compact(rqst);
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ txnHandler.markCompacted(ci);
+
+ toClean = txnHandler.findReadyToClean();
+ assertEquals(1, toClean.size());
+ txnHandler.markCleaned(ci);
+
+ txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
+ txnHandler.cleanEmptyAbortedTxns();
+ txnList = txnHandler.getOpenTxns();
+ assertEquals(3, txnList.getOpen_txnsSize());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ TxnDbUtil.prepDb();
+ txnHandler = new CompactionTxnHandler(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+
+ private long openTxn() throws MetaException {
+ List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
+ return txns.get(0);
+ }
+
+}
Added: hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java?rev=1574266&view=auto
==============================================================================
--- hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java (added)
+++ hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java Wed Mar 5 00:20:53 2014
@@ -0,0 +1,1079 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Tests for TxnHandler.
+ */
+public class TestTxnHandler {
+ static final private String CLASS_NAME = TxnHandler.class.getName();
+ static final private Log LOG = LogFactory.getLog(CLASS_NAME);
+
+ private HiveConf conf = new HiveConf();
+ private TxnHandler txnHandler;
+
+ public TestTxnHandler() throws Exception {
+ TxnDbUtil.setConfValues(conf);
+ LogManager.getLogger(TxnHandler.class.getName()).setLevel(Level.DEBUG);
+ tearDown();
+ }
+
+ @Test
+ public void testValidTxnsEmpty() throws Exception {
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(0L, txnsInfo.getTxn_high_water_mark());
+ assertTrue(txnsInfo.getOpen_txns().isEmpty());
+ GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+ assertEquals(0L, txns.getTxn_high_water_mark());
+ assertTrue(txns.getOpen_txns().isEmpty());
+ }
+
+ @Test
+ public void testOpenTxn() throws Exception {
+ long first = openTxn();
+ assertEquals(1L, first);
+ long second = openTxn();
+ assertEquals(2L, second);
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+ assertEquals(2, txnsInfo.getOpen_txns().size());
+ assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+ assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(0).getState());
+ assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
+ assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+ assertEquals("me", txnsInfo.getOpen_txns().get(1).getUser());
+ assertEquals("localhost", txnsInfo.getOpen_txns().get(1).getHostname());
+
+ GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+ assertEquals(2L, txns.getTxn_high_water_mark());
+ assertEquals(2, txns.getOpen_txns().size());
+ boolean[] saw = new boolean[3];
+ for (int i = 0; i < saw.length; i++) saw[i] = false;
+ for (Long tid : txns.getOpen_txns()) {
+ saw[tid.intValue()] = true;
+ }
+ for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+ }
+
+ @Test
+ public void testAbortTxn() throws Exception {
+ OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
+ List<Long> txnList = openedTxns.getTxn_ids();
+ long first = txnList.get(0);
+ assertEquals(1L, first);
+ long second = txnList.get(1);
+ assertEquals(2L, second);
+ txnHandler.abortTxn(new AbortTxnRequest(1));
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+ assertEquals(2, txnsInfo.getOpen_txns().size());
+ assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+ assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
+ assertEquals(2L, txnsInfo.getOpen_txns().get(1).getId());
+ assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+
+ GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+ assertEquals(2L, txns.getTxn_high_water_mark());
+ assertEquals(2, txns.getOpen_txns().size());
+ boolean[] saw = new boolean[3];
+ for (int i = 0; i < saw.length; i++) saw[i] = false;
+ for (Long tid : txns.getOpen_txns()) {
+ saw[tid.intValue()] = true;
+ }
+ for (int i = 1; i < saw.length; i++) assertTrue(saw[i]);
+ }
+
+ @Test
+ public void testAbortInvalidTxn() throws Exception {
+ boolean caught = false;
+ try {
+ txnHandler.abortTxn(new AbortTxnRequest(195L));
+ } catch (NoSuchTxnException e) {
+ caught = true;
+ }
+ assertTrue(caught);
+ }
+
+ @Test
+ public void testValidTxnsNoneOpen() throws Exception {
+ txnHandler.openTxns(new OpenTxnRequest(2, "me", "localhost"));
+ txnHandler.commitTxn(new CommitTxnRequest(1));
+ txnHandler.commitTxn(new CommitTxnRequest(2));
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(2L, txnsInfo.getTxn_high_water_mark());
+ assertEquals(0, txnsInfo.getOpen_txns().size());
+ GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+ assertEquals(2L, txns.getTxn_high_water_mark());
+ assertEquals(0, txns.getOpen_txns().size());
+ }
+
+ @Test
+ public void testValidTxnsSomeOpen() throws Exception {
+ txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
+ txnHandler.abortTxn(new AbortTxnRequest(1));
+ txnHandler.commitTxn(new CommitTxnRequest(2));
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(3L, txnsInfo.getTxn_high_water_mark());
+ assertEquals(2, txnsInfo.getOpen_txns().size());
+ assertEquals(1L, txnsInfo.getOpen_txns().get(0).getId());
+ assertEquals(TxnState.ABORTED, txnsInfo.getOpen_txns().get(0).getState());
+ assertEquals(3L, txnsInfo.getOpen_txns().get(1).getId());
+ assertEquals(TxnState.OPEN, txnsInfo.getOpen_txns().get(1).getState());
+
+ GetOpenTxnsResponse txns = txnHandler.getOpenTxns();
+ assertEquals(3L, txns.getTxn_high_water_mark());
+ assertEquals(2, txns.getOpen_txns().size());
+ boolean[] saw = new boolean[4];
+ for (int i = 0; i < saw.length; i++) saw[i] = false;
+ for (Long tid : txns.getOpen_txns()) {
+ saw[tid.intValue()] = true;
+ }
+ assertTrue(saw[1]);
+ assertFalse(saw[2]);
+ assertTrue(saw[3]);
+ }
+
+ @Test
+ public void testLockDifferentDBs() throws Exception {
+ // Test that two different databases don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockSameDB() throws Exception {
+ // Test that two different databases don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockDbLocksTable() throws Exception {
+ // Test that locking a database prevents locking of tables in the database
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockDbDoesNotLockTableInDifferentDB() throws Exception {
+ // Test that locking a database prevents locking of tables in the database
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "yourdb");
+ comp.setTablename("mytable");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockDifferentTables() throws Exception {
+ // Test that two different tables don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("yourtable");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockSameTable() throws Exception {
+ // Test that two different tables don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockTableLocksPartition() throws Exception {
+ // Test that locking a table prevents locking of partitions of the table
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockDifferentTableDoesntLockPartition() throws Exception {
+ // Test that locking a table prevents locking of partitions of the table
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("yourtable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockDifferentPartitions() throws Exception {
+ // Test that two different partitions don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("yourpartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockSamePartition() throws Exception {
+ // Test that two different partitions don't collide on their locks
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockSRSR() throws Exception {
+ // Test that two shared read locks can share a partition
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockESRSR() throws Exception {
+ // Test that exclusive lock blocks shared reads
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockSRSW() throws Exception {
+ // Test that write can acquire after read
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockESRSW() throws Exception {
+ // Test that exclusive lock blocks read and write
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockSRE() throws Exception {
+ // Test that read blocks exclusive
+ LockComponent comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockESRE() throws Exception {
+ // Test that exclusive blocks read and exclusive
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockSWSR() throws Exception {
+ // Test that read can acquire after write
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockSWSWSR() throws Exception {
+ // Test that write blocks write but read can still acquire
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testLockSWSWSW() throws Exception {
+ // Test that write blocks two writes
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockEESW() throws Exception {
+ // Test that exclusive blocks exclusive and write
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testLockEESR() throws Exception {
+ // Test that exclusive blocks exclusive and read
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.WAITING);
+ }
+
+ @Test
+ public void testCheckLockAcquireAfterWaiting() throws Exception {
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ long lockid1 = res.getLockid();
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components.clear();
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ long lockid2 = res.getLockid();
+ assertTrue(res.getState() == LockState.WAITING);
+
+ txnHandler.unlock(new UnlockRequest(lockid1));
+ res = txnHandler.checkLock(new CheckLockRequest(lockid2));
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testCheckLockNoSuchLock() throws Exception {
+ try {
+ txnHandler.checkLock(new CheckLockRequest(23L));
+ fail("Allowed to check lock on non-existent lock");
+ } catch (NoSuchLockException e) {
+ }
+ }
+
+ @Test
+ public void testCheckLockTxnAborted() throws Exception {
+ // Test that when a transaction is aborted, the heartbeat fails
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ long lockid = res.getLockid();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ try {
+ // This will throw NoSuchLockException (even though it's the
+ // transaction we've closed) because that will have deleted the lock.
+ txnHandler.checkLock(new CheckLockRequest(lockid));
+ fail("Allowed to check lock on aborted transaction.");
+ } catch (NoSuchLockException e) {
+ }
+ }
+
+ @Test
+ public void testMultipleLock() throws Exception {
+ // Test more than one lock can be handled in a lock request
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(2);
+ components.add(comp);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("anotherpartition");
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ long lockid = res.getLockid();
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ res = txnHandler.checkLock(new CheckLockRequest(lockid));
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.unlock(new UnlockRequest(lockid));
+ assertEquals(0, txnHandler.numLocksInLockTable());
+ }
+
+ @Test
+ public void testMultipleLockWait() throws Exception {
+ // Test that two shared read locks can share a partition
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(2);
+ components.add(comp);
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("anotherpartition");
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ long lockid1 = res.getLockid();
+ assertTrue(res.getState() == LockState.ACQUIRED);
+
+
+ comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ res = txnHandler.lock(req);
+ long lockid2 = res.getLockid();
+ assertTrue(res.getState() == LockState.WAITING);
+
+ txnHandler.unlock(new UnlockRequest(lockid1));
+
+ res = txnHandler.checkLock(new CheckLockRequest(lockid2));
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ }
+
+ @Test
+ public void testUnlockOnCommit() throws Exception {
+ // Test that committing unlocks
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ assertEquals(0, txnHandler.numLocksInLockTable());
+ }
+
+ @Test
+ public void testUnlockOnAbort() throws Exception {
+ // Test that committing unlocks
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ assertEquals(0, txnHandler.numLocksInLockTable());
+ }
+
+ @Test
+ public void testUnlockWithTxn() throws Exception {
+ LOG.debug("Starting testUnlockWithTxn");
+ // Test that attempting to unlock locks associated with a transaction
+ // generates an error
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ long lockid = res.getLockid();
+ try {
+ txnHandler.unlock(new UnlockRequest(lockid));
+ fail("Allowed to unlock lock associated with transaction.");
+ } catch (TxnOpenException e) {
+ }
+ }
+
+ @Test
+ public void testHeartbeatTxnAborted() throws Exception {
+ // Test that when a transaction is aborted, the heartbeat fails
+ openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(1));
+ HeartbeatRequest h = new HeartbeatRequest();
+ h.setTxnid(1);
+ try {
+ txnHandler.heartbeat(h);
+ fail("Told there was a txn, when it should have been aborted.");
+ } catch (TxnAbortedException e) {
+ }
+ }
+
+ @Test
+ public void testHeartbeatNoTxn() throws Exception {
+ // Test that when a transaction is aborted, the heartbeat fails
+ HeartbeatRequest h = new HeartbeatRequest();
+ h.setTxnid(939393L);
+ try {
+ txnHandler.heartbeat(h);
+ fail("Told there was a txn, when there wasn't.");
+ } catch (NoSuchTxnException e) {
+ }
+ }
+
+ @Test
+ public void testHeartbeatLock() throws Exception {
+ conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1);
+ HeartbeatRequest h = new HeartbeatRequest();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ h.setLockid(res.getLockid());
+ for (int i = 0; i < 30; i++) {
+ try {
+ txnHandler.heartbeat(h);
+ } catch (NoSuchLockException e) {
+ fail("Told there was no lock, when the heartbeat should have kept it.");
+ }
+ }
+ }
+
+ @Test
+ public void testLockTimeout() throws Exception {
+ long timeout = txnHandler.setTimeout(1);
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ comp.setTablename("mytable");
+ comp.setPartitionname("mypartition");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ LockResponse res = txnHandler.lock(req);
+ assertTrue(res.getState() == LockState.ACQUIRED);
+ Thread.currentThread().sleep(10);
+ try {
+ txnHandler.checkLock(new CheckLockRequest(res.getLockid()));
+ fail("Told there was a lock, when it should have timed out.");
+ } catch (NoSuchLockException e) {
+ } finally {
+ txnHandler.setTimeout(timeout);
+ }
+ }
+
+ @Test
+ public void testHeartbeatNoLock() throws Exception {
+ HeartbeatRequest h = new HeartbeatRequest();
+ h.setLockid(29389839L);
+ try {
+ txnHandler.heartbeat(h);
+ fail("Told there was a lock, when there wasn't.");
+ } catch (NoSuchLockException e) {
+ }
+ }
+
+ @Ignore // This test breaks the others when it unsets the value
+ @Test
+ public void testNoJDBCDriver() throws Exception {
+ HiveConf confCopy = new HiveConf(conf);
+ confCopy.unset(HiveConf.ConfVars.HIVE_TXN_JDBC_DRIVER.varname);
+ boolean sawException = false;
+ try {
+ TxnHandler tt = new TxnHandler(confCopy);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException && e.getMessage().contains("JDBC " +
+ "driver for transaction db not set")) {
+ sawException = true;
+ } else {
+ throw e;
+ }
+ }
+ assertTrue(sawException);
+ }
+
+ @Test
+ public void testCompactMajorWithPartition() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(1, compacts.size());
+ ShowCompactResponseElement c = compacts.get(0);
+ assertEquals("foo", c.getDbname());
+ assertEquals("bar", c.getTablename());
+ assertEquals("ds=today", c.getPartitionname());
+ assertEquals(CompactionType.MAJOR, c.getType());
+ assertEquals("initiated", c.getState());
+ assertEquals(0L, c.getStart());
+ }
+
+ @Test
+ public void testCompactMinorNoPartition() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setRunas("fred");
+ txnHandler.compact(rqst);
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ assertEquals(1, compacts.size());
+ ShowCompactResponseElement c = compacts.get(0);
+ assertEquals("foo", c.getDbname());
+ assertEquals("bar", c.getTablename());
+ assertNull(c.getPartitionname());
+ assertEquals(CompactionType.MINOR, c.getType());
+ assertEquals("initiated", c.getState());
+ assertEquals(0L, c.getStart());
+ assertEquals("fred", c.getRunAs());
+ }
+
+ @Test
+ public void showLocks() throws Exception {
+ long begining = System.currentTimeMillis();
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+
+ // Open txn
+ txnid = openTxn();
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
+ comp.setTablename("mytable");
+ components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ res = txnHandler.lock(req);
+
+ // Locks not associated with a txn
+ components = new ArrayList<LockComponent>(1);
+ comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb");
+ comp.setTablename("yourtable");
+ comp.setPartitionname("yourpartition");
+ components.add(comp);
+ req = new LockRequest(components, "you", "remotehost");
+ res = txnHandler.lock(req);
+
+ ShowLocksResponse rsp = txnHandler.showLocks(new ShowLocksRequest());
+ List<ShowLocksResponseElement> locks = rsp.getLocks();
+ assertEquals(3, locks.size());
+ boolean[] saw = new boolean[locks.size()];
+ for (int i = 0; i < saw.length; i++) saw[i] = false;
+ for (ShowLocksResponseElement lock : locks) {
+ if (lock.getLockid() == 1) {
+ assertEquals(1, lock.getTxnid());
+ assertEquals("mydb", lock.getDbname());
+ assertNull(lock.getTablename());
+ assertNull(lock.getPartname());
+ assertEquals(LockState.ACQUIRED, lock.getState());
+ assertEquals(LockType.EXCLUSIVE, lock.getType());
+ assertTrue(begining <= lock.getLastheartbeat() &&
+ System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
+ + " and " + System.currentTimeMillis(),
+ begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
+ assertEquals("me", lock.getUser());
+ assertEquals("localhost", lock.getHostname());
+ saw[0] = true;
+ } else if (lock.getLockid() == 2) {
+ assertEquals(2, lock.getTxnid());
+ assertEquals("mydb", lock.getDbname());
+ assertEquals("mytable", lock.getTablename());
+ assertNull(lock.getPartname());
+ assertEquals(LockState.WAITING, lock.getState());
+ assertEquals(LockType.SHARED_READ, lock.getType());
+ assertTrue(begining <= lock.getLastheartbeat() &&
+ System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertEquals(0, lock.getAcquiredat());
+ assertEquals("me", lock.getUser());
+ assertEquals("localhost", lock.getHostname());
+ saw[1] = true;
+ } else if (lock.getLockid() == 3) {
+ assertEquals(0, lock.getTxnid());
+ assertEquals("yourdb", lock.getDbname());
+ assertEquals("yourtable", lock.getTablename());
+ assertEquals("yourpartition", lock.getPartname());
+ assertEquals(LockState.ACQUIRED, lock.getState());
+ assertEquals(LockType.SHARED_WRITE, lock.getType());
+ assertTrue(begining <= lock.getLastheartbeat() &&
+ System.currentTimeMillis() >= lock.getLastheartbeat());
+ assertTrue(begining <= lock.getAcquiredat() &&
+ System.currentTimeMillis() >= lock.getAcquiredat());
+ assertEquals("you", lock.getUser());
+ assertEquals("remotehost", lock.getHostname());
+ saw[2] = true;
+ } else {
+ fail("Unknown lock id");
+ }
+ }
+ for (int i = 0; i < saw.length; i++) assertTrue("Didn't see lock id " + i, saw[i]);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ TxnDbUtil.prepDb();
+ txnHandler = new TxnHandler(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb();
+ }
+
+ private long openTxn() throws MetaException {
+ List<Long> txns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost")).getTxn_ids();
+ return txns.get(0);
+ }
+
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1574266&r1=1574265&r2=1574266&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Wed Mar 5 00:20:53 2014
@@ -18,18 +18,6 @@
package org.apache.hadoop.hive.ql;
-import java.io.DataInput;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.antlr.runtime.TokenRewriteStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,10 +34,23 @@ import org.apache.hadoop.hive.ql.hooks.W
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
+import java.io.DataInput;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Context for Semantic Analyzers. Usage: not reusable - construct a new one for
* each query should call clear() at end of use to remove temporary folders
@@ -93,6 +94,9 @@ public class Context {
protected List<HiveLock> hiveLocks;
protected HiveLockManager hiveLockMgr;
+ // Transaction manager for this query
+ protected HiveTxnManager hiveTxnManager;
+
private boolean needLockMgr;
// Keep track of the mapping from load table desc to the output and the lock
@@ -533,15 +537,12 @@ public class Context {
this.hiveLocks = hiveLocks;
}
- public HiveLockManager getHiveLockMgr() {
- if (hiveLockMgr != null) {
- hiveLockMgr.refresh();
- }
- return hiveLockMgr;
+ public HiveTxnManager getHiveTxnManager() {
+ return hiveTxnManager;
}
- public void setHiveLockMgr(HiveLockManager hiveLockMgr) {
- this.hiveLockMgr = hiveLockMgr;
+ public void setHiveTxnManager(HiveTxnManager txnMgr) {
+ hiveTxnManager = txnMgr;
}
public void setOriginalTracker(String originalTracker) {