You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/05/11 07:40:52 UTC
[hive] branch master updated: HIVE-23349: ACID: Concurrent MERGE
INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod,
Peter Varga, Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ffba728 HIVE-23349: ACID: Concurrent MERGE INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod, Peter Varga, Peter Vary)
ffba728 is described below
commit ffba728281dde1ed742cbdf523707265ec4f9381
Author: Denys Kuzmenko <dk...@apache.org>
AuthorDate: Mon May 11 09:38:42 2020 +0200
HIVE-23349: ACID: Concurrent MERGE INSERT operations produce duplicates (Denys Kuzmenko, reviewed by Marton Bod, Peter Varga, Peter Vary)
---
ql/src/java/org/apache/hadoop/hive/ql/Context.java | 4 +
.../apache/hadoop/hive/ql/exec/ExplainTask.java | 10 ++-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 17 ++++-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 3 +-
.../hive/ql/parse/MergeSemanticAnalyzer.java | 1 +
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 2 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 88 ++++++++++++++++++++++
.../clientpositive/llap/explain_locks.q.out | 4 +-
8 files changed, 121 insertions(+), 8 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9f59d4c..5e92980 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -181,6 +181,10 @@ public class Context {
this.operation = operation;
}
+ public Operation getOperation() {
+ return operation;
+ }
+
public WmContext getWmContext() {
return wmContext;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index c1f94d1..750abcb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -37,6 +37,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.VectorizationDetailLevel;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
@@ -372,7 +375,12 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable {
if (jsonOutput) {
out = null;
}
- List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(), conf);
+ Operation operation = Optional.of(work).map(ExplainWork::getParseContext)
+ .map(ParseContext::getContext).map(Context::getOperation)
+ .orElse(Operation.OTHER);
+
+ List<LockComponent> lockComponents = AcidUtils.makeLockComponents(work.getOutputs(), work.getInputs(),
+ operation, conf);
if (null != out) {
out.print("LOCK INFORMATION:\n");
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 66404ab..270c590 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -2921,11 +2922,13 @@ public class AcidUtils {
* @return list with lock components
*/
public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, Set<ReadEntity> inputs,
- HiveConf conf) {
+ Context.Operation operation, HiveConf conf) {
+
List<LockComponent> lockComponents = new ArrayList<>();
boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);
boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
+ boolean isMerge = operation == Context.Operation.MERGE;
// For each source to read, get a shared_read lock
for (ReadEntity input : inputs) {
@@ -3040,9 +3043,17 @@ public class AcidUtils {
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
if (sharedWrite) {
- compBuilder.setSharedWrite();
+ if (!isMerge) {
+ compBuilder.setSharedWrite();
+ } else {
+ compBuilder.setExclWrite();
+ }
} else {
- compBuilder.setSharedRead();
+ if (!isMerge) {
+ compBuilder.setSharedRead();
+ } else {
+ compBuilder.setExclusive();
+ }
}
} else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index deaab89..71afcbd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -421,7 +421,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
LOG.debug("No locks needed for queryId=" + queryId);
return null;
}
- List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf);
+ List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(),
+ ctx.getOperation(), conf);
lockComponents.addAll(getGlobalLocks(ctx.getConf()));
//It's possible there's nothing to lock even if we have w/r entities.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
index 3ffdcec..cea7b11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -59,6 +59,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
"MergeSemanticAnalyzer");
}
+ ctx.setOperation(Context.Operation.MERGE);
analyzeMerge(tree);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index b435e79..a5c7831 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -54,11 +54,11 @@ public abstract class DbTxnManagerEndToEndTestBase {
@Before
public void setUp() throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
SessionState.start(conf);
ctx = new Context(conf);
driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
TxnDbUtil.cleanDb(conf);
SessionState ss = SessionState.get();
ss.initTxnMgr(conf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 2adabe7..8a15b7c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2189,6 +2189,94 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
}
}
+ @Test
+ public void testInsertMergeInsertLocking() throws Exception {
+ testMergeInsertLocking(false);
+ }
+ @Test
+ public void testInsertMergeInsertLockingSharedWrite() throws Exception {
+ testMergeInsertLocking(true);
+ }
+
+ private void testMergeInsertLocking(boolean sharedWrite) throws Exception {
+ dropTable(new String[]{"target", "source"});
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+
+ driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into target values (1,2), (3,4)");
+ driver.run("create table source (a int, b int)");
+ driver.run("insert into source values (5,6), (7,8)");
+
+ driver.compileAndRespond("insert into target values (5, 6)");
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+
+ driver.compileAndRespond("merge into target t using source s on t.a = s.a " +
+ "when not matched then insert values (s.a, s.b)");
+ txnMgr2.acquireLocks(driver.getPlan(), driver.getContext(), "T2", false);
+ List<ShowLocksResponseElement> locks = getLocks();
+
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock((sharedWrite ? LockType.SHARED_WRITE : LockType.SHARED_READ),
+ LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks);
+ checkLock((sharedWrite ? LockType.EXCL_WRITE : LockType.EXCLUSIVE),
+ LockState.WAITING, "default", "target", null, locks);
+ }
+
+ @Test
+ public void test2MergeInsertsConcurrentNoDuplicates() throws Exception {
+ testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " +
+ "when not matched then insert values (s.a, s.b)", false);
+ }
+ @Test
+ public void test2MergeInsertsConcurrentSharedWriteNoDuplicates() throws Exception {
+ testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " +
+ "when not matched then insert values (s.a, s.b)", true);
+ }
+ @Test
+ public void testtInsertMergeInsertConcurrentNoDuplicates() throws Exception {
+ testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", false);
+ }
+ @Test
+ public void testtInsertMergeInsertConcurrentSharedWriteNoDuplicates() throws Exception {
+ testConcurrentMergeInsertNoDuplicates("insert into target values (5, 6)", true);
+ }
+
+ private void testConcurrentMergeInsertNoDuplicates(String query, boolean sharedWrite) throws Exception {
+ dropTable(new String[]{"target", "source"});
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+ driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+
+ driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into target values (1,2), (3,4)");
+ driver.run("create table source (a int, b int)");
+ driver.run("insert into source values (5,6), (7,8)");
+
+ driver.compileAndRespond(query);
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+
+ driver2.compileAndRespond("merge into target t using source s on t.a = s.a " +
+ "when not matched then insert values (s.a, s.b)");
+
+ swapTxnManager(txnMgr);
+ driver.run();
+
+ //merge should notice snapshot changes and re-create it
+ swapTxnManager(txnMgr2);
+ driver2.run();
+
+ swapTxnManager(txnMgr);
+ driver.run("select * from target");
+ List res = new ArrayList();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals("Duplicate records found", 4, res.size());
+ }
+
/**
* Check that DP with partial spec properly updates TXN_COMPONENTS
*/
diff --git a/ql/src/test/results/clientpositive/llap/explain_locks.q.out b/ql/src/test/results/clientpositive/llap/explain_locks.q.out
index d62f6cc..fb795df 100644
--- a/ql/src/test/results/clientpositive/llap/explain_locks.q.out
+++ b/ql/src/test/results/clientpositive/llap/explain_locks.q.out
@@ -150,7 +150,7 @@ default.target.p=1/q=3 -> EXCL_WRITE
default.target.p=1/q=3 -> EXCL_WRITE
default.target.p=1/q=2 -> EXCL_WRITE
default.target.p=1/q=2 -> EXCL_WRITE
-default.target -> SHARED_READ
+default.target -> EXCLUSIVE
PREHOOK: query: explain locks update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
PREHOOK: type: QUERY
PREHOOK: Input: default@source
@@ -248,4 +248,4 @@ default.target.p=1/q=3 -> SHARED_WRITE
default.target.p=1/q=3 -> SHARED_WRITE
default.target.p=1/q=2 -> SHARED_WRITE
default.target.p=1/q=2 -> SHARED_WRITE
-default.target -> SHARED_WRITE
+default.target -> EXCL_WRITE