You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/05/24 09:45:41 UTC

[hive] branch master updated: HIVE-26252: Missing locks in case of MERGE with multiple branches (Denys Kuzmenko, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2500b2dfb19 HIVE-26252: Missing locks in case of MERGE with multiple branches (Denys Kuzmenko, reviewed by Peter Vary)
2500b2dfb19 is described below

commit 2500b2dfb19b9ed60f88ea64d3d35e410b34961f
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Tue May 24 11:45:25 2022 +0200

    HIVE-26252: Missing locks in case of MERGE with multiple branches (Denys Kuzmenko, reviewed by Peter Vary)
    
    Closes #3308
---
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 83 ++++++++++++++++++++++
 .../hadoop/hive/metastore/LockRequestBuilder.java  |  6 +-
 2 files changed, 87 insertions(+), 2 deletions(-)

diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index eaf1a3cd73d..e0e62263ee7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2396,6 +2396,89 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
     Assert.assertEquals("Lost Update", "[earl\t10, amy\t10]", res.toString());
   }
 
+  @Test
+  public void testMergeMultipleBranchesOptimistic() throws Exception {
+    dropTable(new String[]{"target", "src1", "src2"});
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+    conf.setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, false);
+    
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, false);
+
+    driver.run("create table target (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
+    driver.run("insert into target values " +
+      "('0', 'orig_FyZl'), " +
+      "(5, 'orig_VsbLsaG'), " +
+      "(10, 'orig_dhhCassOoV')");
+
+    driver.run("create table src1 (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
+    driver.run("insert into src1 values " +
+      "(0, 'new1_tnlGat'), " +
+      "(1, 'new1_KulBf'), " +
+      "(2, 'new1_zkLGuU'), " +
+      "(3, 'new1_jznZVac')," +
+      "(4, 'new1_hdyazJXL')," +
+      "(5, 'new1_gxclXFtP')," +
+      "(6, 'new1_CNZr')," +
+      "(7, 'new1_GoBjjuow')," +
+      "(8, 'new1_vRfY')," +
+      "(9, 'new1_bdnQA')," +
+      "(10, 'new1_FNboL')");
+
+    driver.run("create table src2 (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
+    driver.run("insert into src2 values " +
+      "(0, 'new2_Cjdj'), " +
+      "(1, 'new2_GysxGF'), " +
+      "(2, 'new2_ToHyf'), " +
+      "(3, 'new2_HZjkahVJ')," +
+      "(4, 'new2_qcySYYUul')," +
+      "(5, 'new2_FupKyDcVcJ')," +
+      "(6, 'new2_DAcCwakVr')," +
+      "(7, 'new2_nZozPAZKI')," +
+      "(8, 'new2_bjdEmdRp')," +
+      "(9, 'new2_PkRAwdJeLX')," +
+      "(10, 'new2_aGSuZHx')");
+
+    driver.compileAndRespond("MERGE INTO target t USING src1 s ON t.id = s.id " +
+      "WHEN MATCHED THEN UPDATE SET txt = CONCAT_WS(' ',t.txt,s.txt) " +
+      "WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.txt)");
+    
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.compileAndRespond("MERGE INTO target t USING src2 s ON t.id = s.id " +
+      "WHEN MATCHED THEN UPDATE SET txt = CONCAT_WS(' ',t.txt,s.txt) " +
+      "WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.txt)");
+
+    swapTxnManager(txnMgr);
+    driver.run();
+
+    swapTxnManager(txnMgr2);
+    try {
+      driver2.run();
+    } catch (Exception ex) {
+      Assert.assertTrue(ex.getCause() instanceof LockException);
+      Assert.assertTrue(ex.getMessage().matches(".*Aborting .* due to a write conflict on default/target.*"));
+    }
+    swapTxnManager(txnMgr);
+    driver.run("select * from target order by id");
+    
+    List<String> res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(11, res.size());
+    Assert.assertEquals(
+      "[0\torig_FyZl new1_tnlGat, " +
+      "1\tnew1_KulBf, " +
+      "2\tnew1_zkLGuU, " +
+      "3\tnew1_jznZVac, " +
+      "4\tnew1_hdyazJXL, " +
+      "5\torig_VsbLsaG new1_gxclXFtP, " +
+      "6\tnew1_CNZr, " +
+      "7\tnew1_GoBjjuow, " +
+      "8\tnew1_vRfY, " +
+      "9\tnew1_bdnQA, " +
+      "10\torig_dhhCassOoV new1_FNboL]", res.toString());
+  }
+  
   @Test
   public void testConcurrent2InsertOverwritesDiffPartitions() throws Exception {
     testConcurrent2InsertOverwrites(false);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
index b43410d44c4..f731f71e199 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore;
 
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 
@@ -168,8 +169,9 @@ public class LockRequestBuilder {
       if (existing == null) {
         // No existing lock for this partition.
         parts.put(comp.getPartitionname(), comp);
-      }  else if (lockTypeComparator.compare(comp.getType(), existing.getType()) > 0) {
-        // We only need to promote if comp.type is > existing.type.
+      } else if (lockTypeComparator.compare(comp.getType(), existing.getType()) > 0
+          || comp.getType() == existing.getType() && existing.getOperationType() == DataOperationType.INSERT) {
+        // We only need to promote if comp.type is > existing.type or it's an update/delete
         parts.put(comp.getPartitionname(), comp);
       }
     }