You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/05/04 00:24:39 UTC

[1/2] hive git commit: HIVE-11848 - tables in subqueries don't get locked (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/branch-1 ab2951237 -> bfc249632


HIVE-11848 - tables in subqueries don't get locked (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0780218f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0780218f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0780218f

Branch: refs/heads/branch-1
Commit: 0780218f2e075e43aa9051313b1e13b034d778ae
Parents: ab29512
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue May 3 14:51:38 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue May 3 14:51:38 2016 -0700

----------------------------------------------------------------------
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  | 16 +++++++++-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 33 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0780218f/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 4c69534..a80c093 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -321,7 +321,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     // Walk through all our inputs and set them to note that this read is part of an update or a
     // delete.
     for (ReadEntity input : inputs) {
-      input.setUpdateOrDelete(true);
+      if(isWritten(input)) {
+        input.setUpdateOrDelete(true);
+      }
     }
 
     if (inputIsPartitioned(inputs)) {
@@ -369,6 +371,18 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     }
   }
 
+  /**
+   * Check that {@code readEntity} is also being written
+   */
+  private boolean isWritten(Entity readEntity) {
+    for(Entity writeEntity : outputs) {
+      //make sure to compare them as Entity, i.e. that it's the same table or partition, etc
+      if(writeEntity.toString().equalsIgnoreCase(readEntity.toString())) {
+        return true;
+      }
+    }
+    return false;
+  }
   private String operation() {
     if (updating()) return "update";
     else if (deleting()) return "delete";

http://git-wip-us.apache.org/repos/asf/hive/blob/0780218f/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 0a91348..42c7064 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -72,6 +72,39 @@ public class TestDbTxnManager2 {
     TxnDbUtil.prepDb();
   }
   @Test
+  public void testLocksInSubquery() throws Exception {
+    checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)"));
+    checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
+    checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
+
+    checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
+    txnMgr.openTxn("one");
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
+    List<ShowLocksResponseElement> locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
+    txnMgr.rollbackTxn();
+
+    checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
+    txnMgr.openTxn("one");
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "S", null, locks.get(1));
+    txnMgr.rollbackTxn();
+
+    checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
+    txnMgr.openTxn("three");
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks.get(0));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks.get(1));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks.get(2));
+  }
+  @Test
   public void createTable() throws Exception {
     CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
     checkCmdOnDriver(cpr);


[2/2] hive git commit: HIVE-13213 make DbLockManger work for non-acid resources (Eugene Koifman, reviewed by Alan Gates)

Posted by ek...@apache.org.
HIVE-13213 make DbLockManger work for non-acid resources (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bfc24963
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bfc24963
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bfc24963

Branch: refs/heads/branch-1
Commit: bfc249632378c1b9c12c059c817b2c6227c7e0e7
Parents: 0780218
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Tue May 3 14:52:50 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Tue May 3 14:52:50 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  5 ++
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 12 +++
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 22 ++++++
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 81 ++++++++++++++++++++
 4 files changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index ffd450a..f7ef88e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -89,6 +89,11 @@ import java.util.regex.Pattern;
  * If we ever decide to run remote Derby server, according to
  * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be
  * seriazlied, so that would also work though has not been tested.
+ *
+ * General design note:
+ * It's imperative that any operation on a txn (e.g. commit), ensure (atomically) that this txn is
+ * still valid and active.  In the code this is usually achieved at the same time the txn record
+ * is locked for some operation.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a9867ef..28ee8a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -213,6 +214,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
           break;
 
         case INSERT:
+          t = output.getTable();
+          if(t == null) {
+            throw new IllegalStateException("No table info for " + output);
+          }
+          if(AcidUtils.isAcidTable(t)) {
+            compBuilder.setShared();
+          }
+          else {
+            compBuilder.setExclusive();
+          }
+          break;
         case DDL_SHARED:
           compBuilder.setShared();
           break;

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 1bddecb..a901074 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -433,6 +433,28 @@ public class TestTxnCommands2 {
   }
 
   /**
+   * Test update that hits multiple partitions (i.e. requries dynamic partition insert to process)
+   * @throws Exception
+   */
+  @Test
+  public void updateDeletePartitioned() throws Exception {
+    int[][] tableData = {{1,2},{3,4},{5,6}};
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+    runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
+    txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR));
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+    List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+    int[][] expectedData = {{1,1,2},{1,3,5},{1,5,6},{2,1,2},{2,3,5},{2,5,6}};
+    Assert.assertEquals("Update " + Table.ACIDTBLPART + " didn't match:", stringifyValues(expectedData), rs);
+  }
+
+  /**
    * https://issues.apache.org/jira/browse/HIVE-10151
    */
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/bfc24963/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 42c7064..0e2bfc0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -524,6 +524,87 @@ public class TestDbTxnManager2 {
     Assert.assertEquals(0, count);
   }
 
+  /**
+   * collection of queries where we ensure that we get the locks that are expected
+   * @throws Exception
+   */
+  @Test
+  public void checkExpectedLocks() throws Exception {
+    CommandProcessorResponse cpr = null;
+    cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table nonAcidPart(a int, b int) partitioned by (p string) stored as orc");
+    checkCmdOnDriver(cpr);
+
+    cpr = driver.compileAndRespond("insert into nonAcidPart partition(p) values(1,2,3)");
+    checkCmdOnDriver(cpr);
+    LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    List<ShowLocksResponseElement> locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__1", null, locks.get(0));
+    checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks.get(1));
+    List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+
+    cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)");
+    checkCmdOnDriver(cpr);
+    lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__2", null, locks.get(0));
+    checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks.get(1));
+    relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+
+    cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)");
+    checkCmdOnDriver(cpr);
+    lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__3", null, locks.get(0));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks.get(1));
+    relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+
+    cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)");
+    checkCmdOnDriver(cpr);
+    lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "values__tmp__table__4", null, locks.get(0));
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks.get(1));
+    relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+
+    cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1");
+    checkCmdOnDriver(cpr);
+    lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));
+    relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+
+    cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1");
+    checkCmdOnDriver(cpr);
+    lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
+    locks = getLocks();
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks.get(0));//https://issues.apache.org/jira/browse/HIVE-13212
+    relLocks = new ArrayList<HiveLock>(2);
+    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
+    txnMgr.getLockManager().releaseLocks(relLocks);
+  }
+
   private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
     Assert.assertEquals(l.toString(),l.getType(), type);
     Assert.assertEquals(l.toString(),l.getState(), state);