You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/05/11 07:40:52 UTC

[hive] branch master updated: HIVE-23349: ACID: Concurrent MERGE INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod, Peter Varga, Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ffba728  HIVE-23349: ACID: Concurrent MERGE INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod, Peter Varga, Peter Vary)
ffba728 is described below

commit ffba728281dde1ed742cbdf523707265ec4f9381
Author: Denys Kuzmenko <dk...@apache.org>
AuthorDate: Mon May 11 09:38:42 2020 +0200

    HIVE-23349: ACID: Concurrent MERGE INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod, Peter Varga, Peter Vary)
---
 ql/src/java/org/apache/hadoop/hive/ql/Context.java |  4 +
 .../apache/hadoop/hive/ql/exec/ExplainTask.java    | 10 ++-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 17 ++++-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |  3 +-
 .../hive/ql/parse/MergeSemanticAnalyzer.java       |  1 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |  2 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 88 ++++++++++++++++++++++
 .../clientpositive/llap/explain_locks.q.out        |  4 +-
 8 files changed, 121 insertions(+), 8 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9f59d4c..5e92980 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -181,6 +181,10 @@ public class Context {
     this.operation = operation;
   }
 
+  public Operation getOperation() {
+    return operation;
+  }
+
   public WmContext getWmContext() {
     return wmContext;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index c1f94d1..750abcb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -372,7 +375,12 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
     if (jsonOutput) {
       out = null;
     }
-    List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
+    Operation operation = Optional.of(work).map(ExplainWork::getParseContext)
+        .map(ParseContext::getContext).map(Context::getOperation)
+        .orElse(Operation.OTHER);
+
+    List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(),
+        operation, conf);
     if (null != out) {
       out.print("LOCK INFORMATION:\n");
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 66404ab..270c590 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -2921,11 +2922,13 @@ public class AcidUtils {
    * @return list with lock components
    */
   public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, Set<ReadEntity> inputs,
-      HiveConf conf) {
+      Context.Operation operation, HiveConf conf) {
+
     List<LockComponent> lockComponents = new ArrayList<>();
     boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
     boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);
     boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
+    boolean isMerge = operation == Context.Operation.MERGE;
 
     // For each source to read, get a shared_read lock
     for (ReadEntity input : inputs) {
@@ -3040,9 +3043,17 @@ public class AcidUtils {
         assert t != null;
         if (AcidUtils.isTransactionalTable(t)) {
           if (sharedWrite) {
-            compBuilder.setSharedWrite();
+            if (!isMerge) {
+              compBuilder.setSharedWrite();
+            } else {
+              compBuilder.setExclWrite();
+            }
           } else {
-            compBuilder.setSharedRead();
+            if (!isMerge) {
+              compBuilder.setSharedRead();
+            } else {
+              compBuilder.setExclusive();
+            }
           }
         } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
           final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index deaab89..71afcbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -421,7 +421,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       LOG.debug("No locks needed for queryId=" + queryId);
       return null;
     }
-    List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf);
+    List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(),
+        ctx.getOperation(), conf);
     lockComponents.addAll(getGlobalLocks(ctx.getConf()));
 
     //It's possible there's nothing to lock even if we have w/r entities.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
index 3ffdcec..cea7b11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -59,6 +59,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
       throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
           "MergeSemanticAnalyzer");
     }
+    ctx.setOperation(Context.Operation.MERGE);
     analyzeMerge(tree);
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index b435e79..a5c7831 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -54,11 +54,11 @@ public abstract class DbTxnManagerEndToEndTestBase {
 
   @Before
   public void setUp() throws Exception {
-    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
     SessionState.start(conf);
     ctx = new Context(conf);
     driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
     driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
     TxnDbUtil.cleanDb(conf);
     SessionState ss = SessionState.get();
     ss.initTxnMgr(conf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 2adabe7..8a15b7c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2189,6 +2189,94 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
     }
   }
 
+  @Test
+  public void testInsertMergeInsertLocking() throws Exception {
+    testMergeInsertLocking(false);
+  }
+  @Test
+  public void testInsertMergeInsertLockingSharedWrite() throws Exception {
+    testMergeInsertLocking(true);
+  }
+
+  private void testMergeInsertLocking(boolean sharedWrite) throws Exception {
+    dropTable(new String[]{"target", "source"});
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+
+    driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into target values (1,2), (3,4)");
+    driver.run("create table source (a int, b int)");
+    driver.run("insert into source values (5,6), (7,8)");
+
+    driver.compileAndRespond("insert into target values (5, 6)");
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+
+    driver.compileAndRespond("merge into target t using source s on t.a = s.a " +
+        "when not matched then insert values (s.a, s.b)");
+    txnMgr2.acquireLocks(driver.getPlan(), driver.getContext(), "T2", false);
+    List<ShowLocksResponseElement> locks = getLocks();
+
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ),
+        LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks);
+    checkLock((sharedWrite ? LockType.EXCL_WRITE : LockType.EXCLUSIVE),
+        LockState.WAITING, "default", "target", null, locks);
+  }
+
+  @Test
+  public void test2MergeInsertsConcurrentNoDuplicates() throws Exception {
+    testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " +
+        "when not matched then insert values (s.a, s.b)", false);
+  }
+  @Test
+  public void test2MergeInsertsConcurrentSharedWriteNoDuplicates() throws Exception {
+    testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " +
+        "when not matched then insert values (s.a, s.b)", true);
+  }
+  @Test
+  public void testtInsertMergeInsertConcurrentNoDuplicates() throws Exception {
+    testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", false);
+  }
+  @Test
+  public void testtInsertMergeInsertConcurrentSharedWriteNoDuplicates() throws Exception {
+    testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", true);
+  }
+
+  private void testConcurrentMergeInsertNoDuplicates(String query, boolean sharedWrite) throws Exception {
+    dropTable(new String[]{"target", "source"});
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+
+    driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into target values (1,2), (3,4)");
+    driver.run("create table source (a int, b int)");
+    driver.run("insert into source values (5,6), (7,8)");
+
+    driver.compileAndRespond(query);
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+
+    driver2.compileAndRespond("merge into target t using source s on t.a = s.a " +
+        "when not matched then insert values (s.a, s.b)");
+
+    swapTxnManager(txnMgr);
+    driver.run();
+
+    //merge should notice snapshot changes and re-create it
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    swapTxnManager(txnMgr);
+    driver.run("select * from target");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals("Duplicate records found", 4, res.size());
+  }
+
   /**
    * Check that DP with partial spec properly updates TXN_COMPONENTS
    */
diff --git a/ql/src/test/results/clientpositive/llap/explain_locks.q.out b/ql/src/test/results/clientpositive/llap/explain_locks.q.out
index d62f6cc..fb795df 100644
--- a/ql/src/test/results/clientpositive/llap/explain_locks.q.out
+++ b/ql/src/test/results/clientpositive/llap/explain_locks.q.out
@@ -150,7 +150,7 @@ default.target.p=1/q=3 -> EXCL_WRITE
 default.target.p=1/q=3 -> EXCL_WRITE
 default.target.p=1/q=2 -> EXCL_WRITE
 default.target.p=1/q=2 -> EXCL_WRITE
-default.target -> SHARED_READ
+default.target -> EXCLUSIVE
 PREHOOK: query: explain locks update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@source
@@ -248,4 +248,4 @@ default.target.p=1/q=3 -> SHARED_WRITE
 default.target.p=1/q=3 -> SHARED_WRITE
 default.target.p=1/q=2 -> SHARED_WRITE
 default.target.p=1/q=2 -> SHARED_WRITE
-default.target -> SHARED_WRITE
+default.target -> EXCL_WRITE